We’ve decided to start a new series for getting familiar with the different Apache Nifi services and processors, and I’m calling it “Getting Familiar”. We’ll try to post fairly often about different processors, using the controller services and configuring certain things in Nifi.The first one in the series will be about the ExecuteScript processor.

Getting Famliar: ExecuteScript Processor

When the Nifi team came out with the ExecuteScript processor, I knew it was a big win. It allows for so much more flexability in the flow since you can create a custom processor on the fly, without having to write a full fledged custom java processor. Finding out that the your data needs to be massaged a little more and being able to do it via your prefered scripting language is pretty nice.

The ExecuteScript processor is still “Experimental”, but it works fairly well. I’ve noticed the performance isn’t quite as on par as a Java processor, but it is good enough in most cases. Look for a future post comparing performance across a set of ExecuteScript processors and a Java processor! ExecuteScript supports quite a few scripting languages including groovy, python, jython, jruby, ruby, javascript, lua and luaj. Today’s examplie will be in python.

Inorder to take full advantage of the processor framework Nifi has built, there are a few variables they have exposed.

session: exposes the current ProcessSession, allowing you to take action on a flow file: get the current file, create a new one, put an attribute transfer the flow file to a relationship.

context: exposes the current ProcessContext, allowing access to the proecssor properties and relationships. Also provides access to the StateManager which can be used for simple key-value pairs.

log: exposes the current ProcessorLog, allowing you to log things to the normal nifi log

Relationships:

  • REL_SUCCESS: the success relationship for the processor
  • REL_FAILURE: the failure relationship for the processor

For adding properties, which nifi calls DynamicProperties, be aware that you have to keep to the scripting languages naming properties for variables - for python you have to start the name with an underscore or a letter, Groovy you can’t use periods in the name, etc.

Using ExecuteScript

For our first use case I’ll show a very basic example, expanding on our simple XML example from Getting Started with Apache Nifi.

We’ll replace the EvaluateXPath processor with an ExecuteScript processor with our python code to get the elements from the XML we want. So we want to

  • get the pubDate - xpath - channel/item[2]/pubDate
  • get the link - xpath - channel/item[2]/title
  • get the title - xpath -channel/item[2]/link
import xml.etree.ElementTree as ET
#import a few libraries to use for reading in the flow files
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback

#define a callback to use
class Callback(InputStreamCallback):
	def __init__(self):
		self.matches = {}

	def process(self, inputStream):
		rss = IOUtils.toString(inputStream,StandardCharsets.UTF_8)
		root = ET.fromstring(rss)
				
		self.matches['pubDate'] = root.find(".//channel//item[2]//pubDate").text
		self.matches["link"] = root.find(".//channel//item[2]//link").text
		self.matches["title"] = root.find(".//channel//item[2]//title").text

flowFile = session.get()

if flowFile != None:
	callback = Callback()
	session.read(flowFile, callback)
	for key,value in callback.matches.iteritems():
		#example logging
		log.info("Adding {} with value {} as an attribute".format(key,value))
		flowFile = session.putAttribute(flowFile,key,value)
	session.transfer(flowFile, REL_SUCCESS)
	session.commit()

So we first create a class that is going to use the InputStreamCallback to read the flowfile. The main function in it is the process function which takes that inputStream and then works on it, reading it in with IOUtils.

We make use of this class, Callback, by passing it to the session.read function. This funciton is very similar to the one in Java - read expects a flowFile as the first parameter and an InputStream as the second. Once we are done going through the file, we get the variable we set, loop through it and set those attributes on the flowfile. We could use putAllAttributes there and pass a json string, but that would require more formatting in this case.

This python code can be copied and pasted directly into the ExecuteScript Script Body property and after pointing the Module Director to your python dir, eg /usr/lib/python, it should work.

The final flow is basically the same as the Getting Started flow, just with the ExecuteScript replacing the EvaluateXPath. After running once, if you have the PutFile stopped, you can inspect the flowFile and veryify it has the attributes as expected!

And the final flow:

Summary and Resources

This example showed the basics of using the nifi ExecuteScript Processor with python, how to access the flowFile, dealing with the session and logging.

If you would like more examples let us know! We are here to help! There are some other great resources out there too, BatchIQ github example for ExecuteScript, and funnifi’s ExecuteScript examples. BatchIQ’s github examples are pretty helpful for using all the available components Nifi exposes to you, so take a look there for more information, or ask in the comments below.