Spark SQL DataFrame Basic

Spark SQL is a spark module for structured data processing. User can execute sql queries by taking advantage of spark in-memory data processing architecture.

DataFrame :- 
  1. Distributed collection of data organized into named columns.It is conceptually equivalent to table in relational database.
  2. Can be constructed from Hive,external database or existing RDD's
  3. Entry point for all functionality in Spark SQL is the SQLContext class and or one of its descendants.
There are two ways to convert existing RDD into DataFrame.

1. Inferring Schema
2. Programmatically Specifying Schema

We will see example for both step by step.

Inferring Schema :This is done by using case classes.The names of the arguments to the case class are read using reflection and become the names of the columns.

Below is sample code:


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame

object InferSchemaExample {
  
 def main(args: Array[String]) {
   
      val sparkConf = new SparkConf().setAppName("InferSchemaDemo")
      val sc = new SparkContext(sparkConf)
      
      //create sql context
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
       
      //create employee class for inferring schema
      case class Employee(name: String, age: Int , designation:String)
      
      //read file and convert into DF
      val employee = sc.textFile("/user/root/spark/data/employee.txt").map(_.split("\\|")).map(emp => Employee(emp(0), emp(1).trim.toInt,emp(2))).toDF()
      
      //save employee information as parquent --< this just to show parquet feature --< one can directly register table 
      // to proceed with data processing  
      employee.write.mode("overwrite").parquet("/user/root/spark/employee.parquet")
      
      //read parquent file
      val parquetFile = sqlContext.read.parquet("/user/root/spark/employee.parquet")
      
      //register temp table
      parquetFile.registerTempTable("employee")
      
      // read all records from table
      val empRecord = sqlContext.sql("SELECT * FROM employee")
      
      empRecord.map(t => "Name: " + t(0)).collect()
      
      //Save output on HDFS
      empRecord.rdd.saveAsTextFile("/user/root/spark/out")

 }

}


2: Programmatically Specifying Schema :There are three steps for doing this.
  • Create an RDD of Rows from the original RDD;
  • Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  • Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
Here is sample code :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

object ProgmmedSchema {

  def main(args: Array[String]) {
      
      val sparkConf = new SparkConf().setAppName("ProgmdSchemaDemo")
      val sc = new SparkContext(sparkConf)
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      
      val schema = StructType(Array(StructField("EName",StringType,true),StructField("Eage",IntegerType,true),StructField("EDesignation",StringType,true)))
      val rowRDD =sc.textFile("/user/root/spark/data/employee.txt").map(_.split("\\|")).map(emp => org.apache.spark.sql.Row(emp(0),emp(1).toInt,emp(2)))
      val employeeSchemaRDD = sqlContext.applySchema(rowRDD, schema)
      
      employeeSchemaRDD.registerTempTable("employee")
      val empRecord = sqlContext.sql("SELECT * FROM employee")
      empRecord.map(t => "Name: " + t(1)).collect()
      
 }
}

Happy Sparking ...!!!

Reference : Apache Spark Documentation..

Comments

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application

Developing Custom Processor in Apache Nifi