Connecting NiFi with ActiveMQ

Connecting NiFi with ActiveMQ

In this article, we discuss the existing options to connect NiFi and ActiveMQ; and present a robust solution with good performance to add support for ActiveMQ into NiFi using the protocol AMQP 1.0 and the Apache Qpid JMS library. Before we get too technical, let us start with a round of introductions just in case you are wondering what these names are.

 

 

1. NiFi 

 

Niti logo

 

Apache NiFi is an open-source solution for moving data between systems. We do not intend to make a thorough introduction about NiFi in this post, since there are many out there – but what you must know is that if you need to move data between different systems in real-time while being able to do small transformations to the data, and you are looking for a reliable and scalable solution to do it, NiFi is a perfect tool for the job. 

 

Its simple user interface, completely graphical, makes its learning curve very sweet. And once you take your first steps in NiFi, you will realisthat in no time, you will be making NiFi flows like a pro. 

 

For the last few years, NiFi has been getting very popularActually, not long before the COVID-19 outbreak where physical conferences were a thing, I remember myself attending one of these conferences and realizing “Oh man, everyone is just talking about NiFi”



A few years ago, Hortonworks started offering NiFi as part of their Hortonworks Data Flow (HDF), and back in 2018 Cloudera and Hortonworks merged and HDF evolved into what is today called the Cloudera Data Flow (CDF). In CDF, the component in which NiFi resides is called Cloudera Flow Management (CFM).  

 

CDF also includes MiNiFi as well as Kafka, Flink, auxiliary services related to these and the Shared Data Experience (SDX) layer for unified security, governance, and single-sign-on (SSO). So, if NiFi was not getting popular enough, the fact that Cloudera offers it in a full enterprise-grade platform together with Kafka and Flink, gave the ultimate push to its popularity. 

 

Figure 1 - Cloudera Data Flow, including NiFi, Kafka, Flink, auxiliary services and SDX

Figure 1 –Cloudera Data Flow, including NiFi, Kafka, Flink, auxiliary services and SDX.

 

NiFi ships, at the time of writing, 283 processors (see the NiFi documentation for the most up-to-date list of available processors), including functionality to interact with all sorts of systems such as MySQL, HDFS, HBase, Mongo, Kafka, Elasticsearch, Couchbase, RethinkDB, Slack, Cassandra, Hive, InfluxDBKudu, Solr, and of course file systems (local or network), as well as protocols and APIs such as JMS, JDBC, IMAP, POP3, MQTT, AMQP. It also has wide support for many services in Azure, AWS or GCP, and public APIs such as Twitter.  

 

Even though NiFi has native support for many systems, there are still situations in which you may miss something. And this is what we faced in a recent project with one of our customers. We needed to connect to an ActiveMQ platform, and even though NiFi has a few options out-of-the-box to do so, as we will discuss below, these options were not suitable for our particular case. But before we dive into how we addressed this, let us formally introduce ActiveMQ.

 

 

2. ActiveMQ

ActiveMQ_Project_Logo

 

Apache ActiveMQ is an open-source message broker built on top of the Java Messaging Service (JMS)Various programming languages (Java, C, C++, Python, etc.) can be used for creating clients. It supports multiple protocols including, but not limited to, RESTMQTTAMQPStompand OpenWire.  

 

With ActiveMQ, one can create message queues (the “MQ” in ActiveMQ), and these queues can be used as communication bridges between different applications even if these applications are using different programming languages, protocols, and physical/virtual locations (as long as there is network connectivity between the systems where the applications are running and in which ActiveMQ is running).

 
Queues work on a FIFO (first-in, first-out) basis. One or more applications work as producers – they create messages and publish them into the queue. These messages are consumed one at a time by other applications that work as consumers, and once one consumer has consumed a message, that message is gone from the queue. 

 

ActiveMQ can also work with topics in addition to queues. Topics work according to the publisher-subscriber model, i.e., one or more applications work as producers, like for the queues, but in this case, the messages in topics are received by all the applications that are subscribed as consumers. 

 

So, if you want your messages to be consumed once and only once, you will go with queues, whereas if what you want is effectively a broadcasting model, you will go with topics. 

 

ActiveMQ is not the only platform out there to offer the above-mentioned communication models (queues, topics)Alternatives include RabbitMQ and Kafka, or Azure Service BusAmazon SNS/SQSGoogle Pub/Sub if you are into public cloudsIf you are interested in a comparison between these technologies, we recommend reading the following articlesthis one compares ActiveMQ and RabbitMQ, while this one compares ActiveMQ and Kafka. Finally, this article compares ActiveMQ, RabbitMQ, Kafka, and a few of the public cloud services for message brokering 

 

 

3. NiFi and ActiveMQ compatibility

 

In a recent project, we needed NiFi to publish and consume messages to and from ActiveMQ queuesIf one looks at the list of protocols supported by ActiveMQ and the list of available NiFi processors, one will find the following matchesRESTMQTT and AMQP. 

 

 

3.1. REST

 

Publishing and consuming messages in ActiveMQ via REST API in NiFi is possible using the InvokeHTTP processor. However, ActiveMQ’s REST API is not as efficient as using other APIs (the REST API is a wrapper of the JMS API, so of course, using the JMS API is more efficient). In this project we were expecting a high number of messages, so we wanted to use the best performing option.

 

 

3.2. MQTT

 

Our next option was to use the MQTT protocol and the PublishMQTT and ConsumeMQTT processors in NiFi. However, this simple light-weight protocol, which is very common in IoT scenarios, only supports the publish-subscribe modelSo it works only with topics, not with queues. And in our case, we required queues. Moreover, MQTT lacks the robustness of our final option – AMQP (see this comparison of MQTT and AMQP).  

 

 

3.3. AMQP

 

So, our final option is AMQP using the PublishAMQP and ConsumeAMQP processors in NiFi. On paper, this looked like the best option in terms of performance and robustness. However, as we were determined to try these out, we quickly found out one important limitation: these NiFi processors only support version 0-9-1 of the AMQP protocolwhile ActiveMQ only supports version 1.0 of AMQP.  

 

So close! But, at the same time so far – since despite the nameAMQP 1.0 is very different from older versions such as 0-9-1At this point, one can wonder why NiFi is supporting the old protocol and not the new one. The answer is that NiFi being an open-source project means that its various processors are contributed by the community 

 

Checking the NiFi Release Noteswe see that AMQP support was added in the NiFi version 0.5.0 back in February 2016, and by checking the exact issue where the support was added, we see that the driver was the need to connect to RabbitMQ. It turns out that AMQP 0-9-1 is the core protocol of RabbitMQ, so if you are using RabbitMQ it kind of makes sense to use AMQP 0-9-1, but in ActiveMQ, that version of the protocol is not even supported.  

 

 

4. NiFi and ActiveMQ using AMQP 1.0 and JMS

 

As explained in the previous section, even if NiFi has processors called PublishAMQP and ConsumeAMQP, these are useless when dealing with ActiveMQ. However, there are two other processors that, if properly configured, can help here: PublishJMS and ConsumeJMSAs their names indicate, these processors will leverage the JMS API directly, which is the the one with the best performance. 

 

The fix explained in this blog was derived from hereEssentially, and as explained at the bottom of the AMQP page of ActiveMQ, one can use the Apache Qpid Proton client library to connect to ActiveMQ using AMQP 1.0. And since NiFi has the JMS processors, we will want to use the Qpid JMS (AMQP 1.0) version. 

 

Follow these exact steps to enable ActiveMQ support for NiFi using the AMQP 1.0 protocol: 

 

    1. Download the Qpid JMS (AMQP 1.0) binary tar.gz file from http://qpid.apache.org/download.html. At the time of writing, the most recent version is 0.58.0.Qpid JMS (AMQP 1.0) binary tar.gz file
    2. Put the downloaded tar.gz file in each NiFi worker nodedecompress the file, and give read permissions to all users (or at least to the user running NiFi). For example, assuming the tar.gz file is in /var/tmp and we want to put the library in /opt/qpid:

      sudo mkdir /opt/qpid
      cd /opt/qpid/
      sudo tar xvzf /var/tmp/apache-qpid-jms-0.58.0-bin.tar.gz
      sudo chmod -R 755 /opt/qpid

       

    3. Create the JMS/JNDI configuration file that contains the connection details, i.e., the URL, for the ActiveMQ connection. For example, assuming we want to store the configuration file in /var/tmp/jndi.properties and that the ActiveMQ instance is running locally too, the configuration file would look like:

      connectionfactory.qpidConnectionFactory = amqp://localhost:5672

       

    4. Next, configure the JMS processors accordingly. Make sure the following properties are set (replace the values according to your case):

      Property

      Value

      Destination Name  

      nifiactivemqblog  

      Destination Type   

      QUEUE  

      JNDI Initial Context Factory Class  

      org.apache.qpid.jms.jndi.JmsInitialContextFactory  

      JNDI Provider URL  

      /var/tmp/jndi.properties 

      JNDI Name of the Connection Factory  

      qpidConnectionFactory 

      JNDI / JMS Client Libraries 

      /opt/qpid/apache-qpid-jms-0.58.0/lib/ 

       

      Note that the property “JNDI Name of the Connection Factory” must have the same value we have specified in the configuration file (but without the “connectionfactory.”). In this case, it is qpidConnectionFactory. 

       

      The above configuration is the same for the PublishJMS and ConsumeJMS processors. 

       

 

5. Example

 

We have implemented a simple NiFi flow to demonstrate how to use the above method to allow NiFi to publish and consume messages to and from ActiveMQ, using the AMQP protocol 1.0 and the JMS library provided by Apache Qpid.  

 

This example generates an empty FlowFile every 5 seconds (GenerateFlowFile) and sets a dummy message with the current timestamp (ReplaceText) before publishing the message into an ActiveMQ queue (PublishJMS)Finally, the published messages are consumed from the queue (ConsumeJMS) and sent to a NiFi funnel.  

 

Simple NiFi flow

 

6. Enterprise ActiveMQ and NiFi – High Availability and TLS/SSL

 

In the simple example above, we are running ActiveMQ locally without High Availability or TLS/SSL. But in ActiveMQ enterprise deployments, one will most probably find High Availability and Failover as well as in-motion encryption with TLS/SSL. This is totally fine with regards to the presented solution, and all we need to do is to property specify the connection details, i.e., the URL, in the JMS/JNDI configuration file. Note that it would not be necessary to specify the SSL context on NiFi, since the establishing of the SSL connection will be done by the Apache Qpid library. 

 

For example, if we would be using ActiveMQ with High Availability and Failover, with four nodes (amqnode1-4), and with in-motion encryption with TLS/SSL (using port 61616 and the keystore and truststore in /opt/qpid), all we need to do is to specify the connection URL accordingly in the JMS/JNDI configuration file:  

 

connectionfactory.qpidConnectionFactory = failover:(amqps://amqnode1:61616,amqps://amqnode2:61616,amqps://amqnode3:61616,amqps://
amqnode4:61616)failover.maxReconnectAttempts=10&failover.nested.transport.
keyStoreLocation=/opt/qpid/keystore.jks&failover.nested.transport.keyStorePassword
=password&failover.nested.transport.trustStoreLocation=/opt/qpid/truststore.
jks&failover.nested.transport.trustStorePassword=password

 

 

Conclusion

 

In this blog post we discussed the available options to connect NiFi and ActiveMQ, and we selected and explored the best performing and robust option – to use the Apache Qpid JMS client library and the AMQP protocol version 1.0. We also detailed how this can be leveraged in enterprise deployments.

 

At ClearPeaks we are experts on Big Data technologies. We are a reference and trusted partner of Cloudera, and as such, we have experience in many different projects with services such as NiFi. If you require assistance with the solution described in this blog, or with NiFi, Cloudera, or other Big Data technologies, simply contact us. We are here to help you!

 

Big Data and Cloud Services blog banner

Oscar M
oscar.martinez@clearpeaks.com