Custom Source in Flume


Flume provides a way where you can write your own source.As we know that there are default source type available in flume like exec,spoolDir,Tiwtter. Here I have a tried small demonstration for custom flume source.In this example I have written MySource java class which will read single line from input and concatenate them as output and it will pass it to channel.

Example:
Sample Input File :

20
50
50
04
17
59
18
43
28
58
27
81

Sample Output File :
 
20
2050
205050
20505004
2050500417
205050041759
20505004175918
2050500417591843
205050041759184328
20505004175918432858

First line is concatenated with other and process continues in this way.

Here is my Java Code.

MySource.Java
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySource extends AbstractSource implements Configurable, PollableSource {

  private static final Logger logger = LoggerFactory.getLogger(MySource.class);
  private String myProp;
  BufferedReader br;
  Thread tailThread;
  @Override
  public void configure(Context context) {
    String myProp = context.getString("filepath", "defaultValue");
    logger.info("Path Property==============>" + myProp);
    this.myProp = myProp;
  }

  @Override
  public void start() {
 ConcatRunner t = new ConcatRunner();
 tailThread = new Thread(t);
 tailThread.start();
}

  @Override
  public void stop () {
  }

@Override
public Status process() throws EventDeliveryException {
// TODO Auto-generated method stub
return null;
}

private class ConcatRunner implements Runnable {

    @Override
    public void run() {
   Event e;
      String sCurrentLine;
   String finalFlumeString = "";
   
   try
     {
      br = new BufferedReader(new FileReader(myProp));

     while ((sCurrentLine = br.readLine()) != null) {
  System.out.println(sCurrentLine);
  finalFlumeString = finalFlumeString +  sCurrentLine ; // Concatinating String
  e = EventBuilder.withBody(finalFlumeString,
               Charset.forName("UTF-8"));
  getChannelProcessor().processEvent(e);
  Thread.sleep(3000);
  }
     }
     catch(Exception ex){
      System.out.println("Exception in Reading File" + ex.getMessage());      
     }
     try {
  if (br != null)br.close();
  } catch (IOException ex) {
  ex.printStackTrace();
  }
            }
} //ConcatRunner Over 
}

FlumeConfig.conf File :

a1.sources = r1
a1.channels = c1
a1.sinks = k1

#source
a1.sources.r1.type = MySource
a1.sources.r1.restart = true
a1.sources.r1.filepath = /root/input.txt
#sink

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.fileType = DataStream
#channel
a1.channels.c1.type = memory

#connect
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


Before you proceed for running job , create Jar of your  java project and place it into lib folder of Flume(/usr/lib/flume/lib).

Once your are done with above then fire following command form shell.

flume-ng agent -n a1  -f FlumeConfig.conf

Output of Job:



<This Part is optional>
Apart from above , I have created a shell script which will generate input for you.This script expects two parameter 1. No of Rows 2. Delay Time for generating next row, I have delay time as 2 sec. You can change it as per your need.Here is a sample shell script

#!/bin/sh

echo "Please enter the size in terms of rows you want to generate the random file data"
read rows
#rows=$1

echo "Please enter the delay time needed in between the writing of the rows"
read delayTime

#Delete the tmp & generatedRandomDataFile files if they already exist
rm -f tmp
rm -f generatedRandomDataFile

start=$(date +%s)
for i in $(seq $rows)
do
tr -dc 0-9 < /dev/urandom | head -c 2 > tmp
gawk '$1=$1' tmp >> generatedRandomDataFile
sleep $delayTime
done
end=$(date +%s)
DIFF=$(( $end - $start ))
echo "File generated is `pwd`/generatedRandomDataFile"
echo "The file generation took $DIFF seconds"

Here is sample output of shell script.




Reference :

https://flume.apache.org/FlumeDeveloperGuide.html 

Let me know for any suggestion.

Cheers!!!!!!!!!!!!!!!

Comments

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application

Developing Custom Processor in Apache Nifi