Using Camel and NiFi in one solution

Combining superpowers

The example

from(file://C:/in).to(file://C:/out);

One solution on software level

  1. Camel

More complex stuff

  • Separation of Concerns: let NiFi run flow logic and Camel run the connections (without the need of applications doing a lot of integration).
  • Let NiFi work centrally and Camel distributed.
  • Enhances functionality: NiFi processors and Camel’s components.
  • Have a clear transport layer (MQ).

Other options

One solution on code level

How do they work?

  1. From URI (the URI of the Camel component for consuming)
  2. Error URI (The URI of the Camel component for errors)
  3. LogLevel (The loglevel to the NiFi log of the Camel component).
  1. Starts a CamelContext
  2. Configures the route
  3. Creates a consumer template
@OnScheduled
public void onScheduled(final ProcessContext context) {
//Use Assimbly Connector to manage Apache Camel (https://github.com/assimbly/connector)
getLogger().info("Starting Apache Camel");

//Start Apache camel
try {
startCamelConnector();
} catch (Exception e2) {
getLogger().error("Can't start Apache Camel.");
e2.printStackTrace();
}
//Create an Assimbly flow ID
UUID uuid = UUID.randomUUID();
flowId = context.getName() + uuid.toString();
//configure the flow (Camel route)
try {
configureCamelFlow(context);
} catch (Exception e1) {
getLogger().error("Can't configure Apache Camel route.");
e1.printStackTrace();
}

//start the flow (Camel route)
try {
connector.startFlow(flowId);
} catch (Exception e1) {
getLogger().error("Can't start Apache Camel.");
e1.printStackTrace();
}

//Create the endpoint
try {
template = connector.getConsumerTemplate();
} catch (Exception e) {
getLogger().error("Can't create Apache Camel endpoint.");
e.printStackTrace();
}

}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
//Get the message from the Camel route
Object output = template.receiveBody("direct:nifi-" + flowId);
if ( output == null ) {
return;
}

FlowFile flowfile = session.create();
// To write the results back out to flow file
flowfile = session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(output.toString().getBytes());
}
});
session.transfer(flowfile, SUCCESS);}

ProduceWithCamel

Testing the code

More possibilities

Links

--

--

--

Blogs on tech & society.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Early praise for Real-World Kanban

CyberSecLabs — Debug Write up

How to Take Notes and Assign Actions

images/1to1s/DG_5.png

Elasticsearch with Python — Part 1 [Installation and basics]

A million requests per second with Python

Controlling Light using Telegram

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
ski n

ski n

Blogs on tech & society.

More from Medium

Event Handling

Cookies and Session ID

System design: A captcha service — Part 1 of 2

Creating My First Web Application with Animal Crossing