Joining Spark RDD's
Hi Friends,
Today I will be demonstrating, how you can perform joins on Spark RDD's. We are going to focus on three basic join operations.
1. Join (Inner)
2.Left Outer Join
3. Right Outer Join
Lets take standard Employee+Department example and create a two RDDs;one holding employee data and another holding department data.
Employee Table :
Today I will be demonstrating, how you can perform joins on Spark RDD's. We are going to focus on three basic join operations.
1. Join (Inner)
2.Left Outer Join
3. Right Outer Join
Lets take standard Employee+Department example and create a two RDDs;one holding employee data and another holding department data.
Employee Table :
Eid | EName | LName |
---|---|---|
101 | Sam | Flam |
102 | Scot | Rut |
103 | Jass | Tez |
val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez")))
Array[(Int, String, String)] = Array((101,Sam,Flam), (102,Scot,Rut), (103,Jas,Tez)) // output
Department Table :
DeptId | DepartmentName | Eid |
---|---|---|
D01 | Computer | 101 |
D02 | Electronic | 104 |
D03 | Civil | 102 |
val DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102)))
Array[(String, String, Int)] = Array((D01,Computer,101), (D02,Electronic,104), (D03,Civil,102)) // output
Now as we know for joining two table, we need a key. So lets format our RDD to have key for performing join operation.
val EmpWithKeyRDD = EmpRDD.keyBy(x => x._1) // Forms a new RDD with key as Eid
val DetpWithKeyRDD = DeptRDD.keyBy(x => x._3)
Output after Adding Key :
Array[(Int, (Int, String, String))] = Array((101,(101,Sam,Flam)), (102,(102,Scot,Rut)), (103,(103,Jas,Tez))) Array[(Int, (String, String, Int))] = Array((101,(D01,Computer,101)), (104,(D02,Electronic,104)), (102,(D03,Civil,102)))
We are good to go now. Lets perform join operation on two RDD's
1. Inner Join
val EmpDeptJoin = EmpWithKeyRDD.join(DetpWithKeyRDD)
EmpDeptJoin .collect() // to Check Result
Output :
Array[(Int, ((Int, String, String), (String, String, Int)))] = Array((101,((101,Sam,Flam),(D01,Computer,101))), (102,((102,Scot,Rut),(D03,Civil,102))))
2.Left Outer Join
val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD)
EmpDeptLeftJoin.collect() // to Check Result
Output :
Array[(Int, ((Int, String, String), Option[(String, String, Int)]))] = Array((101,((101,Sam,Flam),Some((D01,Computer,101)))), (102,((102,Scot,Rut),Some((D03,Civil,102)))), (103,((103,Jas,Tez),None)))
3.Left Outer Join
3.Left Outer Join
val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD)
EmpDeptRightJoin .collect() // to Check Result
Output :
Array[(Int, (Option[(Int, String, String)], (String, String, Int)))] = Array((101,(Some((101,Sam,Flam)),(D01,Computer,101))), (104,(None,(D02,Electronic,104))), (102,(Some((102,Scot,Rut)),(D03,Civil,102))))
We can perform formatting on data that we received as output from each action. Lets take output from inner join and bring in proper format.
val EmpDeptJoinFormatted = EmpDeptJoin.map(x => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2))
Array[(Int, String, String, String, String)] = Array((101,Sam,Flam,D01,Computer), (102,Scot,Rut,D03,Civil))
Looking at diagram , it is clear that how each element is formatted for creating final RDD.
Complete Code
package org.com.td.sparkdemo.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext object JoinRDD { val conf = new SparkConf() .setAppName("JoinRDD") .setMaster("local") val sc = new SparkContext(conf) val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez"))) val DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102))) val EmpWithKeyRDD = EmpRDD.keyBy(x => x._1) val DetpWithKeyRDD = DeptRDD.keyBy(x => x._3) val EmpDeptJoin = EmpWithKeyRDD.join(DetpWithKeyRDD) val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD) val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD) val EmpDeptJoinFormatted = EmpDeptJoin.map(x => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2)) EmpDeptJoinFormatted.saveAsTextFile("/path/to/your/output/directory") }
Enjoy...Happy Coding...!!!!!!
Comments
Post a Comment