Developing Custom Processor in Apache Nifi


Apache Nifi was developed to automate the flow of data between different systems. Apache NiFi is based on technology previously called “Niagara Files” that was in development and used at scale within the NSA for the last 8 years and was made available to the Apache Software Foundation through the NSA Technology Transfer Program.



Nifi is based on FlowFiles which are heart of it. A FlowFile is a data record, which consists of a pointer to its content (payload) and attributes to support the content, that is associated with one or more provenance events. The attributes are key/value pairs that act as the metadata for the FlowFile, such as the FlowFile filename. The content is the actual data or the payload of the file. Provenance is a record of what’s happened to the FlowFile. Each one of these parts has its own repository (repo) for storage.


Each flowfile is processed by FlowFile processor . Processors have access to attributes of a given FlowFile and its content stream. Processors can operate on zero or more FlowFiles in a given unit of work and either commit that work or rollback.

Lets try to develop our own processor. This processor will append exclamation (!!!) to each line will write to HDFS using PutHDFS processor.

 Here is flow that I am going to work upon.

Read File From Local ---- Append Exclamation --- Write to HDFS.

Pre-requisite.

1. You should have Nifi installed and running,
2. You should have Maven and IDE(Eclipse).

Now lets Begin.

1. Create a folder on your machine and route to that folder using Terminal (Putty/CMD).
2. Type mvn archetype:generate on terminal and provide following input to maven,

Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): 874: nifi



 Now we have successfully created our Nifi maven module and its time to import code in IDE and modify logic according to our need. We will be writing our code in nifi-exclamation-processors module and nifi-exclamation-nar will be used for deployment.

3. Now go to your IDE(Eclipse) and import maven projects.


4. Navigate to MyProcessor class and Replace with below code.


package com.shashi.example.processors.exclamation;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

 public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
   .Builder().name("My Property")
   .description("Example Property")
   .required(true)
   .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
   .build();


 public static final Relationship REL_SUCCESS = new Relationship.Builder()
   .name("success")
   .description("success")
   .build();

 public static final Relationship REL_FAILURE = new Relationship.Builder()
   .name("failure")
   .description("failure")
   .build();

 private List<PropertyDescriptor> descriptors;

 private Set<Relationship> relationships;

 @Override
 protected void init(final ProcessorInitializationContext context) {
  final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
  descriptors.add(MY_PROPERTY);
  this.descriptors = Collections.unmodifiableList(descriptors);

  final Set<Relationship> relationships = new HashSet<Relationship>();
  relationships.add(REL_SUCCESS);
  relationships.add(REL_FAILURE);
  this.relationships = Collections.unmodifiableSet(relationships);
 }

 @Override
 public Set<Relationship> getRelationships() {
  return this.relationships;
 }

 @Override
 public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
  return descriptors;
 }

 @OnScheduled
 public void onScheduled(final ProcessContext context) {

 }

 @Override
 public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  FlowFile flowFile = session.get();
  if ( flowFile == null ) {
   return;
  }
  try {

   flowFile = session.write(flowFile, new StreamCallback() {
    @Override
    public void process(InputStream inputStream, OutputStream outputStream) throws IOException {
     String input = IOUtils.toString(inputStream);
     String updatedInput[] = input.split("\\r?\\n") ;
     String finalOutput = "";
     for(String s : updatedInput)
     {
      finalOutput = finalOutput + s + "!!!" + "\n";
     }
     IOUtils.write(finalOutput, outputStream);
    }
   });

   session.transfer(flowFile, REL_SUCCESS);
  } catch (Exception e) {
   getLogger().error("Failed while processing a string", e);
   session.transfer(flowFile, REL_FAILURE);
  }



 }
}


5. Now package your code using mvn clean package command.
6.Copy nifi-exclamation-nar-1.0-SNAPSHOT.nar into lib directory on Nifi.



7. Restart Nifi and go to Nifi Web UI.
8. Click on processor icon and search for MyProcessor.
9. Design a data flow as shown below and configure GetFile and PutHDFS processor.


10. Start all processor and check output in HDFS.



Happy Coding...!!!!

Ref : https://nifi.apache.org/



Comments

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application