Read/Write data from Hbase using Spark
Apache spark is open source distributed computing framework for processing large dataset. Spark mainly work on concept of RDD.
RDD is Immutable , Distributed collection of Objects partitioned across cluster.
Two types of operation that can be performed on RDD .
1. Transformation
2. Action
Today we will see how we can use these transformation and action on Hbase data.Lets take traditional 'Employee' example.
Pre-requisite : Make sure Spark,Hbase and Hadoop are functioning properly.
We will be running following list of operation in Hbase data using Spark.
Happy Sparking.......!!!!!!!
Reference : Learning Spark
RDD is Immutable , Distributed collection of Objects partitioned across cluster.
Two types of operation that can be performed on RDD .
1. Transformation
2. Action
Today we will see how we can use these transformation and action on Hbase data.Lets take traditional 'Employee' example.
Pre-requisite : Make sure Spark,Hbase and Hadoop are functioning properly.
We will be running following list of operation in Hbase data using Spark.
- Create Table in Hbase.
- Write data in Hbase.
- Read data from Hbase
- Apply Filter operation on Hbase data
- Store result into HDFS.
Just to give high level overview , I will write employee information in Hbase and filter those employee records whose age is less than 55.
Query : Filter those records from Hbase data whose Age is less than 55.
Expected Output : I should get two records with age 53.
Query : Filter those records from Hbase data whose Age is less than 55.
Expected Output : I should get two records with age 53.
Below is sample code for above example:
I typically use Ecplise for packaging code. Once you successfully compiled and packaged your code then transfer it on cluster where you spark running and use spark submit to start code execution.
Once spark job is complete , verify '/user/root/spark/HbaseFinalOut' has output of your code.
package com.td.SparkHbaseDemo.SparkHBaseExample import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.spark._ import org.apache.hadoop.hbase.util.Bytes object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseEmployeeDemo") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "Employee" //Configuration setting for getting hbase data conf.set(TableInputFormat.INPUT_TABLE, tableName) conf.set("zookeeper.znode.parent ","/hbase-unsecure") conf.addResource("/etc/hbase/conf/hbase-site.xml"); //Check if table present val hbaseAdmin = new HBaseAdmin(conf) if(!hbaseAdmin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) tableDesc.addFamily(new HColumnDescriptor("employeeInformation".getBytes())); hbaseAdmin.createTable(tableDesc) } //Insert Data into Table val empTable = new HTable(conf, tableName) var empRecord1 = new Put(new String("1").getBytes()); empRecord1.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Shashi").getBytes()); empRecord1.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("53").getBytes()); empRecord1.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Chief Architect").getBytes()); empTable.put(empRecord1); var empRecord2 = new Put(new String("2").getBytes()); empRecord2.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Ankit").getBytes()); empRecord2.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("56").getBytes()); empRecord2.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Data Architect").getBytes()); empTable.put(empRecord2); var empRecord3 = new Put(new String("3").getBytes()); empRecord3.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Jitu").getBytes()); empRecord3.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("65").getBytes()); empRecord3.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("CEO").getBytes()); empTable.put(empRecord3); var empRecord4 = new Put(new String("4").getBytes()); empRecord4.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Chhaaaaaaya").getBytes()); empRecord4.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("53").getBytes()); empRecord4.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Chief Architect").getBytes()); empTable.put(empRecord4); empTable.flushCommits(); val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) //create RDD of result val resultRDD =hBaseRDD.map(x => x._2 ) //read individual column information from RDD val employeeRDD = resultRDD.map(result => ((Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeName")))), (Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeDesignation")))), Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeAge"))))) //Filter record from rdd val filteredAge = employeeRDD.filter(result => result._3.toDouble < 55) //Save output to Hadoop filteredAge.saveAsTextFile("/user/root/spark/HbaseFinalOut") sc.stop() } }
I typically use Ecplise for packaging code. Once you successfully compiled and packaged your code then transfer it on cluster where you spark running and use spark submit to start code execution.
spark-submit --class com.td.SparkHbaseDemo.SparkHBaseExample.HBaseTest SparkHBaseExample-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Once spark job is complete , verify '/user/root/spark/HbaseFinalOut' has output of your code.
Happy Sparking.......!!!!!!!
Reference : Learning Spark
Can you share full source code of this project
ReplyDeleteVery nice note!!
ReplyDeleteIt helps a lot.
Is there a way to make it work on Spark 2?
ReplyDelete