Joining Spark RDD's

Hi Friends,

Today I will be demonstrating, how you can perform joins on Spark RDD's. We are going to focus on three basic join operations.

1. Join (Inner)
2.Left Outer Join
3. Right Outer Join

Lets take standard Employee+Department example and create a two RDDs;one holding employee data and another holding department data.

Employee Table :

Eid EName LName
101 Sam Flam
102 Scot Rut
103 Jass Tez

 val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez")))  
Array[(Int, String, String)] = Array((101,Sam,Flam), (102,Scot,Rut), (103,Jas,Tez)) // output 


Department Table :

DeptId DepartmentName Eid
D01 Computer 101
D02 Electronic 104
D03 Civil 102

 val DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102)))  
Array[(String, String, Int)] = Array((D01,Computer,101), (D02,Electronic,104), (D03,Civil,102)) // output


Now as we know for joining two table, we need a key. So lets format our RDD to have key for performing join operation.


 val EmpWithKeyRDD = EmpRDD.keyBy(x => x._1)  // Forms a new RDD with key as Eid 
  val DetpWithKeyRDD = DeptRDD.keyBy(x => x._3)  

Output after Adding Key :

Array[(Int, (Int, String, String))] = Array((101,(101,Sam,Flam)), (102,(102,Scot,Rut)), (103,(103,Jas,Tez)))

Array[(Int, (String, String, Int))] = Array((101,(D01,Computer,101)), (104,(D02,Electronic,104)), (102,(D03,Civil,102)))

We are good to go now. Lets perform join operation on two RDD's

1. Inner Join

val EmpDeptJoin =  EmpWithKeyRDD.join(DetpWithKeyRDD) 
EmpDeptJoin .collect()  // to Check Result

Output :
 Array[(Int, ((Int, String, String), (String, String, Int)))] = Array((101,((101,Sam,Flam),(D01,Computer,101))), (102,((102,Scot,Rut),(D03,Civil,102))))

2.Left Outer Join

val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD)
EmpDeptLeftJoin.collect()  // to Check Result

Output :

Array[(Int, ((Int, String, String), Option[(String, String, Int)]))] = Array((101,((101,Sam,Flam),Some((D01,Computer,101)))), (102,((102,Scot,Rut),Some((D03,Civil,102)))), (103,((103,Jas,Tez),None)))

3.Left Outer Join


val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD) 
EmpDeptRightJoin .collect()  // to Check Result


Output :
Array[(Int, (Option[(Int, String, String)], (String, String, Int)))] = Array((101,(Some((101,Sam,Flam)),(D01,Computer,101))), (104,(None,(D02,Electronic,104))), (102,(Some((102,Scot,Rut)),(D03,Civil,102))))

We can perform formatting on data that we received as output from each action. Lets take output from inner join and bring in proper format.

val EmpDeptJoinFormatted =  EmpDeptJoin.map(x => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2))

Output :


Array[(Int, String, String, String, String)] = Array((101,Sam,Flam,D01,Computer), (102,Scot,Rut,D03,Civil))

Explanation of operation performed using map function on each element of RDD.



Looking at diagram , it is clear that how each element is formatted for creating final RDD.

Complete Code


package org.com.td.sparkdemo.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object JoinRDD { 
  
val conf = new SparkConf() 
      .setAppName("JoinRDD")
      .setMaster("local")
val sc = new SparkContext(conf) 
  
val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez")))  	
val DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102)))
 
val EmpWithKeyRDD  = EmpRDD.keyBy(x => x._1)
val DetpWithKeyRDD = DeptRDD.keyBy(x => x._3)

 
val EmpDeptJoin =  EmpWithKeyRDD.join(DetpWithKeyRDD) 
val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD)
val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD)  

val EmpDeptJoinFormatted =  EmpDeptJoin.map(x => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2))

EmpDeptJoinFormatted.saveAsTextFile("/path/to/your/output/directory")
  
}


Enjoy...Happy Coding...!!!!!!

Comments

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application

Developing Custom Processor in Apache Nifi