Custom Partitioner in Spark

Welcome back to Sparking world.....!!!!

Today we will be looking at one  of the most interesting operation that we can perform in spark - Partitioning

If you have basic understanding of spark then you might be knowing that spark works on a concept called 'RDD'.

Each RDD is partitioned across cluster. One can choose number of partitioned allocated to newly created RDD. For example


val testFile = sc.textFile("/user/root/testfile.txt" ,3)

Second parameter of textFile function indicates number of partition to be allocated to RDD testFile.
If you want to verify then you can use below command in Scala.


testFile .partitions.size

Spark supports two type of partitioner

1.Hash Partitioner - Uses Java Object.hashCodemethod to determine the partition 

2.Range Partitioner - Uses a range to distribute to the respective partitions the keys that fall within a range.

One of basic advantage is that as similar kind of data are co-located therefore shuffling of data across cluster reduces in transformations like groupByKey, reduceByKey etc. which increase job performance.

Lets create our own partitioner. Consider an example where I have file which contain text as well as number information and if i want to store numbers into separate partition and text into separate partition then above default partitioner will not work for me.

So we need to design our own partitioner to work with. Here is sanpshot of sample data that I am having.


Now need to transfer numeric and text into two different partition.Here is a code for the same.


package com.td.SparkHbaseDemo.SparkCustomPartitioner

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner

object CustomPartitionerDemo {
 def main(args: Array[String]) {

  val sparkConf = new SparkConf().setAppName("CustomPartitionerDemo")
  val sc = new SparkContext(sparkConf)
  val inputFile = sc.textFile("/user/root/testfile")
  
  //create paired RDD 
  val pairedData = inputFile.flatMap ( x => x.split(" ") ).map(x => (x,1))
  
  //Define custom pertitioner for paired RDD
  val partitionedData=pairedData.partitionBy(new MyCustomerPartitioner(2))
   
  //verify result using mapPartitionWithIndex
  val finalOut = partitionedData.mapPartitionsWithIndex{(partitionIndex ,dataIterator) =>dataIterator.map(dataInfo => (dataInfo +" is located in  " + partitionIndex +" partition."))}
  
  //Save Output in HDFS
  finalOut.saveAsTextFile("/user/root/partitionOutput")

 }
}

class MyCustomerPartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts
 
 override def getPartition(key: Any): Int = 
 {
     val out = toInt(key.toString)
       out
 }

 override def equals(other: Any): Boolean = other match 
 {
 case dnp: MyCustomerPartitioner =>
 dnp.numPartitions == numPartitions
 case _ =>
 false 
 }
 
 def toInt(s: String): Int = 
 {
  try {
   s.toInt
   0
  } catch {
  case e: Exception => 1
  }
 }
}


Snapshot of output :



Happy Sparking......

Reference : Learning Spark

Comments

  1. Java example can be found here...
    http://baahu.in/spark-custom-partitioner-java-example/

    ReplyDelete
  2. Really appreciated the information and please keep sharing, I would like to share some information regarding online training.Maxmunus Solutions is providing the best quality of this Apache Spark and Scala programming language. and the training will be online and very convenient for the learner.This course gives you the knowledge you need to achieve success.

    For Joining online training batches please feel free to call or email us.
    Email : minati@maxmunus.com
    Contact No.-+91-9066638196/91-9738075708
    website:-www.maxmunus.com

    ReplyDelete

Post a Comment

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application

Developing Custom Processor in Apache Nifi