Spark SQL DataFrame Basic
Spark SQL is a spark module for structured data processing. User can execute sql queries by taking advantage of spark in-memory data processing architecture.
DataFrame :-
2: Programmatically Specifying Schema :There are three steps for doing this.
DataFrame :-
- Distributed collection of data organized into named columns.It is conceptually equivalent to table in relational database.
- Can be constructed from Hive,external database or existing RDD's
- Entry point for all functionality in Spark SQL is the SQLContext class and or one of its descendants.
There are two ways to convert existing RDD into DataFrame.
1. Inferring Schema
2. Programmatically Specifying Schema
We will see example for both step by step.
Inferring Schema :This is done by using case classes.The names of the arguments to the case class are read using reflection and become the names of the columns.
Below is sample code:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.DataFrame object InferSchemaExample { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("InferSchemaDemo") val sc = new SparkContext(sparkConf) //create sql context val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ //create employee class for inferring schema case class Employee(name: String, age: Int , designation:String) //read file and convert into DF val employee = sc.textFile("/user/root/spark/data/employee.txt").map(_.split("\\|")).map(emp => Employee(emp(0), emp(1).trim.toInt,emp(2))).toDF() //save employee information as parquent --< this just to show parquet feature --< one can directly register table // to proceed with data processing employee.write.mode("overwrite").parquet("/user/root/spark/employee.parquet") //read parquent file val parquetFile = sqlContext.read.parquet("/user/root/spark/employee.parquet") //register temp table parquetFile.registerTempTable("employee") // read all records from table val empRecord = sqlContext.sql("SELECT * FROM employee") empRecord.map(t => "Name: " + t(0)).collect() //Save output on HDFS empRecord.rdd.saveAsTextFile("/user/root/spark/out") } }
2: Programmatically Specifying Schema :There are three steps for doing this.
- Create an RDD of Rows from the original RDD;
- Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
- Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType object ProgmmedSchema { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("ProgmdSchemaDemo") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val schema = StructType(Array(StructField("EName",StringType,true),StructField("Eage",IntegerType,true),StructField("EDesignation",StringType,true))) val rowRDD =sc.textFile("/user/root/spark/data/employee.txt").map(_.split("\\|")).map(emp => org.apache.spark.sql.Row(emp(0),emp(1).toInt,emp(2))) val employeeSchemaRDD = sqlContext.applySchema(rowRDD, schema) employeeSchemaRDD.registerTempTable("employee") val empRecord = sqlContext.sql("SELECT * FROM employee") empRecord.map(t => "Name: " + t(1)).collect() } }
Happy Sparking ...!!!
Reference : Apache Spark Documentation..
Comments
Post a Comment