Custom Source in Flume
Example:
Sample Input File :
20
50
50
04
17
59
18
43
28
58
27
81
Sample Output File :
20
2050
205050
20505004
2050500417
205050041759
20505004175918
2050500417591843
205050041759184328
20505004175918432858
First line is concatenated with other and process continues in this way.
Here is my Java Code.
MySource.Java
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySource extends AbstractSource implements Configurable, PollableSource {
private static final Logger logger = LoggerFactory.getLogger(MySource.class);
private String myProp;
BufferedReader br;
Thread tailThread;
@Override
public void configure(Context context) {
String myProp = context.getString("filepath", "defaultValue");
logger.info("Path Property==============>" + myProp);
this.myProp = myProp;
}
@Override
public void start() {
ConcatRunner t = new ConcatRunner();
tailThread = new Thread(t);
tailThread.start();
}
@Override
public void stop () {
}
@Override
public Status process() throws EventDeliveryException {
// TODO Auto-generated method stub
return null;
}
private class ConcatRunner implements Runnable {
@Override
public void run() {
Event e;
String sCurrentLine;
String finalFlumeString = "";
try
{
br = new BufferedReader(new FileReader(myProp));
while ((sCurrentLine = br.readLine()) != null) {
System.out.println(sCurrentLine);
finalFlumeString = finalFlumeString + sCurrentLine ; // Concatinating String
e = EventBuilder.withBody(finalFlumeString,
Charset.forName("UTF-8"));
getChannelProcessor().processEvent(e);
Thread.sleep(3000);
}
}
catch(Exception ex){
System.out.println("Exception in Reading File" + ex.getMessage());
}
try {
if (br != null)br.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
} //ConcatRunner Over
}
FlumeConfig.conf File :
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = MySource
a1.sources.r1.restart = true
a1.sources.r1.filepath = /root/input.txt
#sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.fileType = DataStream
#channel
a1.channels.c1.type = memory
#connect
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Before you proceed for running job , create Jar of your java project and place it into lib folder of Flume(/usr/lib/flume/lib).
Once your are done with above then fire following command form shell.
flume-ng agent -n a1 -f FlumeConfig.conf
Output of Job:
<This Part is optional>
Apart from above , I have created a shell script which will generate input for you.This script expects two parameter 1. No of Rows 2. Delay Time for generating next row, I have delay time as 2 sec. You can change it as per your need.Here is a sample shell script
#!/bin/sh
echo "Please enter the size in terms of rows you want to generate the random file data"
read rows
#rows=$1
echo "Please enter the delay time needed in between the writing of the rows"
read delayTime
#Delete the tmp & generatedRandomDataFile files if they already exist
rm -f tmp
rm -f generatedRandomDataFile
start=$(date +%s)
for i in $(seq $rows)
do
tr -dc 0-9 < /dev/urandom | head -c 2 > tmp
gawk '$1=$1' tmp >> generatedRandomDataFile
sleep $delayTime
done
end=$(date +%s)
DIFF=$(( $end - $start ))
echo "File generated is `pwd`/generatedRandomDataFile"
echo "The file generation took $DIFF seconds"
Here is sample output of shell script.
Reference :
https://flume.apache.org/FlumeDeveloperGuide.html
Let me know for any suggestion.
Cheers!!!!!!!!!!!!!!!
Comments
Post a Comment