Custom Partitioner in Spark
Welcome back to Sparking world.....!!!!
Today we will be looking at one of the most interesting operation that we can perform in spark - Partitioning
If you have basic understanding of spark then you might be knowing that spark works on a concept called 'RDD'.
Each RDD is partitioned across cluster. One can choose number of partitioned allocated to newly created RDD. For example
Second parameter of textFile function indicates number of partition to be allocated to RDD testFile.
If you want to verify then you can use below command in Scala.
Spark supports two type of partitioner
2.Range Partitioner - Uses a range to distribute to the respective partitions the keys that fall within a range.
One of basic advantage is that as similar kind of data are co-located therefore shuffling of data across cluster reduces in transformations like groupByKey, reduceByKey etc. which increase job performance.
Lets create our own partitioner. Consider an example where I have file which contain text as well as number information and if i want to store numbers into separate partition and text into separate partition then above default partitioner will not work for me.
So we need to design our own partitioner to work with. Here is sanpshot of sample data that I am having.
Now need to transfer numeric and text into two different partition.Here is a code for the same.
Snapshot of output :
Happy Sparking......
Reference : Learning Spark
Today we will be looking at one of the most interesting operation that we can perform in spark - Partitioning
If you have basic understanding of spark then you might be knowing that spark works on a concept called 'RDD'.
Each RDD is partitioned across cluster. One can choose number of partitioned allocated to newly created RDD. For example
val testFile = sc.textFile("/user/root/testfile.txt" ,3)
Second parameter of textFile function indicates number of partition to be allocated to RDD testFile.
If you want to verify then you can use below command in Scala.
testFile .partitions.size
Spark supports two type of partitioner
1.Hash Partitioner - Uses Java Object.hashCodemethod to determine the partition
2.Range Partitioner - Uses a range to distribute to the respective partitions the keys that fall within a range.
One of basic advantage is that as similar kind of data are co-located therefore shuffling of data across cluster reduces in transformations like groupByKey, reduceByKey etc. which increase job performance.
Lets create our own partitioner. Consider an example where I have file which contain text as well as number information and if i want to store numbers into separate partition and text into separate partition then above default partitioner will not work for me.
So we need to design our own partitioner to work with. Here is sanpshot of sample data that I am having.
Now need to transfer numeric and text into two different partition.Here is a code for the same.
package com.td.SparkHbaseDemo.SparkCustomPartitioner import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.Partitioner object CustomPartitionerDemo { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("CustomPartitionerDemo") val sc = new SparkContext(sparkConf) val inputFile = sc.textFile("/user/root/testfile") //create paired RDD val pairedData = inputFile.flatMap ( x => x.split(" ") ).map(x => (x,1)) //Define custom pertitioner for paired RDD val partitionedData=pairedData.partitionBy(new MyCustomerPartitioner(2)) //verify result using mapPartitionWithIndex val finalOut = partitionedData.mapPartitionsWithIndex{(partitionIndex ,dataIterator) =>dataIterator.map(dataInfo => (dataInfo +" is located in " + partitionIndex +" partition."))} //Save Output in HDFS finalOut.saveAsTextFile("/user/root/partitionOutput") } } class MyCustomerPartitioner(numParts: Int) extends Partitioner { override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { val out = toInt(key.toString) out } override def equals(other: Any): Boolean = other match { case dnp: MyCustomerPartitioner => dnp.numPartitions == numPartitions case _ => false } def toInt(s: String): Int = { try { s.toInt 0 } catch { case e: Exception => 1 } } }
Snapshot of output :
Happy Sparking......
Reference : Learning Spark
Nice one ! (y)
ReplyDeleteJava example can be found here...
ReplyDeletehttp://baahu.in/spark-custom-partitioner-java-example/
Good one Shashi
ReplyDeleteReally appreciated the information and please keep sharing, I would like to share some information regarding online training.Maxmunus Solutions is providing the best quality of this Apache Spark and Scala programming language. and the training will be online and very convenient for the learner.This course gives you the knowledge you need to achieve success.
ReplyDeleteFor Joining online training batches please feel free to call or email us.
Email : minati@maxmunus.com
Contact No.-+91-9066638196/91-9738075708
website:-www.maxmunus.com