Using Camel and NiFi in one solution

Combining superpowers

The example

from(file://C:/in).to(file://C:/out);

One solution on software level

  1. Camel

More complex stuff

  • Separation of Concerns: let NiFi run flow logic and Camel run the connections (without the need of applications doing a lot of integration).
  • Let NiFi work centrally and Camel distributed.
  • Enhances functionality: NiFi processors and Camel’s components.
  • Have a clear transport layer (MQ).

Other options

One solution on code level

How do they work?

  1. From URI (the URI of the Camel component for consuming)
  2. Error URI (The URI of the Camel component for errors)
  3. LogLevel (The loglevel to the NiFi log of the Camel component).
  1. Starts a CamelContext
  2. Configures the route
  3. Creates a consumer template
@OnScheduled
public void onScheduled(final ProcessContext context) {
//Use Assimbly Connector to manage Apache Camel (https://github.com/assimbly/connector)
getLogger().info("Starting Apache Camel");

//Start Apache camel
try {
startCamelConnector();
} catch (Exception e2) {
getLogger().error("Can't start Apache Camel.");
e2.printStackTrace();
}
//Create an Assimbly flow ID
UUID uuid = UUID.randomUUID();
flowId = context.getName() + uuid.toString();
//configure the flow (Camel route)
try {
configureCamelFlow(context);
} catch (Exception e1) {
getLogger().error("Can't configure Apache Camel route.");
e1.printStackTrace();
}

//start the flow (Camel route)
try {
connector.startFlow(flowId);
} catch (Exception e1) {
getLogger().error("Can't start Apache Camel.");
e1.printStackTrace();
}

//Create the endpoint
try {
template = connector.getConsumerTemplate();
} catch (Exception e) {
getLogger().error("Can't create Apache Camel endpoint.");
e.printStackTrace();
}

}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
//Get the message from the Camel route
Object output = template.receiveBody("direct:nifi-" + flowId);
if ( output == null ) {
return;
}

FlowFile flowfile = session.create();
// To write the results back out to flow file
flowfile = session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(output.toString().getBytes());
}
});
session.transfer(flowfile, SUCCESS);}

ProduceWithCamel

Testing the code

More possibilities

Links

--

--

--

Blogs on tech & society.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Early praise for Real-World Kanban

Data Lakes and other innovations from the 90s

How to Move from Monolith to Microservices on AWS

30 Days of API Testing!

Applied Robotics Research and Development | Plus One Robotics

CAR PARKING WITH ULTRASONIC SENSOR

Optimizing Push Notifications Timing

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
ski n

ski n

Blogs on tech & society.

More from Medium

Project ESP32 #6: Display & Sensor

Upgrade to Oracle EBS 12.2.11 — Issues and Solutions Part 2

Tellor Update March 11th 2022

CODECHEF VSSUT CHAPTER