Using Camel and NiFi in one solution

Raymond Meester
9 min readMay 24, 2020

--

Both Camel and NiFi are Apache projects. Their code is mostly written in Java and both are targeting data processing and integration. However, there are also many differences. One difference is that NiFi is a platform and Camel a framework.

For NiFi it means that it’s a software solution where you centrally build dataflows. The concept of dataflows let you take multiple processors to process data. Together processors form a dataflow. NiFi has around 200 processors, most of which are built-in.

Camel is mostly used at a code level. On the top of the framework companies have build platforms like for example Talend ESB or Red Hat Fuse. However, you can just as easily use it in your own application code or build an API, an integration or a microservice.

Camel supports all kind of integration patterns and components. A developer can take the core-engine of the framework and can add more than 300 components to it. The components and patterns together form a route.

Combining superpowers

Both projects provide a lot of powerful concepts, patterns and processors. Between the 200 processors and 300 components there is a quite some overlap. Still, no software in the world support all libraries, protocols and technologies. What if these superpowers could be combined?

There is currently no NiFi component in Camel and no Camel processor in NiFi. The difficulty is that both have implemented lots of protocols, but they don’t provide one for external parties. It’s like having a gasoline and an electric engine. They can work together in all kind of hybrid ways, but it’s not easy to combine them.

The example

Let’s explore a hybrid solution for NiFi and Camel. As an example we use a very simple, hello world-like, use case:

Moving files between directories.

As source directory we use C:\in and as destination C:\out

How would one create a pure NiFi solution? Well, just use the GetFile and PutFile processor:

And how would this work in Camel? This can be done by using the Camel DSL:

from(file://C:/in).to(file://C:/out);

Both provide a simple and sufficient solution. Nobody would complicate things by using multiple technologies. But to keep things simple this is exactly what we will do :)

Keep in mind that there are many more complex situations where it makes sense to use both. We’ll come back to that later. First we will create a demo to combine Camel and NiFi in one solution. We do this on a software (tooling) and a code level.

One solution on software level

We’ll continue with our simple example of moving files between two directories. To investigate a solution on a software level we shall not code. Only software components are being used.

In NiFi the normal approach is creating flow with the user interface. As we don’t want to code our Camel Route too, we use Assimbly Gateway to configure the route in a browser. Assimbly allows to create connections with Camel and ActiveMQ.

Next step is to find a matching protocol to connect both technologies. A good candidate is to use JMS. Both are well-supported by both Camel and NiFi. Here is the combined flow:

Let’s check the JMS example in more detail.

  1. Camel

The Camel route running in Assimbly picks up a file from C:\in and puts it as a message on the JMS Queue “in” on ActiveMQ (also running in Assimbly).

You can find out how to run this Assimbly flow with Camel and ActiveMQ on the Assimbly wiki. There is a quick-start and also a tuturial on message queueing.

2. NiFi

Apache NiFi gets the message from the queue ‘in’ with the ConsumeJMS processor and publishes it on queue ‘out’ with PublishJMS processor.

To accomplish this, we first create a controller service for JMS:

The ActiveMQ Artemis client library (JMS Client Libraries) is downloaded directly from Maven:

Next step is to configure the ConsumeJMS processor:

And the PublishJMS processor:

Last, but not least, we start the flow:

3. Camel

Another Assimbly flow let Camel consumes the file from the queue ‘out’ and saves it into the directory C:\out. For this flow we clone the first flow and configure it in reverse:

When testing the flow it still functions the same way as NiFi or Camel did on their own, but now combined in one solution.

More complex stuff

You can choose this setup in more complex situations because of:

  • Separation of Concerns: let NiFi run flow logic and Camel run the connections (without the need of applications doing a lot of integration).
  • Let NiFi work centrally and Camel distributed.
  • Enhances functionality: NiFi processors and Camel’s components.
  • Have a clear transport layer (MQ).

It’s possible that completely different teams or engineers work on either of those tools.

Other options

Our example used JMS, it’s of course possible to use other protocols. For example let Camel post a message with the HTTP component and let the “HandleHttpRequest” processor of NiFi handle this request. Then NiFi posts the message with the invokeHttp processor to the “Jetty component” hosted by Camel that saves the message to file.

There are many other possibilities to use both NiFi and Camel (through Assimbly Gateway) together. For example use Apache Kafka broker with topics as broker instead of ActiveMQ. Or to use their REST interfaces. The key point to take is that here you can have separate-of-concerns and this setup support all kinds of use cases.

One solution on code level

Like Camel, NiFi can be extended with Java code. This is done by creating a custom processor or service controller. There has been some discussion in the NiFi community to use Camel code within NiFi processors. This is reflected in the Jira issues:

nifi-924 and nifi-1842

and also on the mailing list:

This has not been materialized yet and there is not a lot of code to find on this topic. Therefore, I created two experimental custom NiFi processors which combines NiFi and Camel code.

How do they work?

As a first step we create a custom NiFi processor. There is an excellent guide for this written by Shubam Gupta. This takes a maven archetype to generate a default custom processor.

With this guide we create a new ‘ConsumeWithCamel’ processor. We add the following properties:

  1. From URI (the URI of the Camel component for consuming)
  2. Error URI (The URI of the Camel component for errors)
  3. LogLevel (The loglevel to the NiFi log of the Camel component).

Then we add the Camel code that:

  1. Starts a CamelContext
  2. Configures the route
  3. Creates a consumer template

We let Assimbly Connector handle the Camel code. This API is used in Assimbly Gateway as well. It uses a convention over configuration approach and already has a lot of Camel components (like the File component) built-in.

Here is the code used when starting the NiFi processor:

@OnScheduled
public void onScheduled(final ProcessContext context) {
//Use Assimbly Connector to manage Apache Camel (https://github.com/assimbly/connector)
getLogger().info("Starting Apache Camel");

//Start Apache camel
try {
startCamelConnector();
} catch (Exception e2) {
getLogger().error("Can't start Apache Camel.");
e2.printStackTrace();
}
//Create an Assimbly flow ID
UUID uuid = UUID.randomUUID();
flowId = context.getName() + uuid.toString();
//configure the flow (Camel route)
try {
configureCamelFlow(context);
} catch (Exception e1) {
getLogger().error("Can't configure Apache Camel route.");
e1.printStackTrace();
}

//start the flow (Camel route)
try {
connector.startFlow(flowId);
} catch (Exception e1) {
getLogger().error("Can't start Apache Camel.");
e1.printStackTrace();
}

//Create the endpoint
try {
template = connector.getConsumerTemplate();
} catch (Exception e) {
getLogger().error("Can't create Apache Camel endpoint.");
e.printStackTrace();
}

}

Last step is to get the messages from Camel with the help of the consumerTemplate and pass it through to the NiFi Processor.

The code to process a message:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
//Get the message from the Camel route
Object output = template.receiveBody("direct:nifi-" + flowId);
if ( output == null ) {
return;
}

FlowFile flowfile = session.create();
// To write the results back out to flow file
flowfile = session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(output.toString().getBytes());
}
});
session.transfer(flowfile, SUCCESS);}

You can find the complete code on Github:

ProduceWithCamel

Next we create another custom Nifi Processor: “ProduceWithCamel”. This is similar to the consume processor, but it works in the reverse direction. For this we’ll use a producerTemplate to produce the messages. The code for this processor you can find here:

Note: These are experimental processors created only for this demo.

Testing the code

To test the code you can download the ConsumeWithCamel processor and also the ProduceWithCamel processor. Both NAR files are put into the lib directory of NiFi.

Now we can use the new Consume processor and configure it:

The Error URI is empty, which mean errors will be logged to the NiFi log file.

Secondly we configure the produce processor:

Finally we connect both processors with each other and start the flow.

The file will be picked up and stored just like in all other examples.

More possibilities

Just like using tools, the code solution also creates all kinds of possibilities. For example dataflows aren’t loosely coupled. So you always need to connect processors or process groups. When creating two process groups now you loosely couple with Camels’ VM component.

The first process group uses the producewithcamel with the URI vm://secondProcessGroup

The second process group consumes this message:

Now both flows move the file from one directory to another, but the process groups aren’t connected as usual. The new solution acts like a ‘wormhole’.

Though every example had the same result, there were many paths. Within integration it’s good to use open source as well as an open mind. Together they’re unstoppable on whatever path you are on.

Links

If you enjoyed this article, please follow me here on Medium for more stories about tech and society!

--

--

Raymond Meester
Raymond Meester

No responses yet