For microservices-based applications, you can use Kafka to allow the microservices to communicate using the publisher-subscriber pattern. Kafka allows us to create a set of producers and consumers to send and receive messages.
In this blog, you will get the basic setup of a Kafka producer and consumer for an Adonis application.
To install Kafka using brew:
1. Install Kafka
<code class="language-javascript">$ brew cask install java</code>
<code class="language-javascript">$ brew install kafka</code>
2. Start the Zookeeper
<code class="language-javascript">$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties</code>
3. Start the Kafka Server
<code class="language-javascript">$ kafka-server-start /usr/local/etc/kafka/server.properties</code>
if you face a connection issue, please update the server.properties settings with the following changes
Replace
<code class="language-javascript">listeners=PLAINTEXT://:9092</code>
to
<code class="language-javascript">listeners=PLAINTEXT://localhost:9092</code>
4. Create a Kafka topic
<code class="language-javascript">$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1</code>
<code class="language-javascript"> --topic sampleTopic </code>
Kafka set up on ubuntu
To install Kafka on ubuntu, you would need a system with at least 4GB RAM.
1. Kafka require Java, so to install OpenJDK execute
<code class="language-javascript">$ sudo apt update</code>
<code class="language-javascript">$ sudo apt install default-jdk</code>
2. Download required Kafka package, here we are using Kafka 2.3.0
<code class="language-javascript">wget http://www-us.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz</code>
3. Extract and move the Kafka file to a specified location
<code class="language-javascript">$ tar xzf kafka_2.11-2.3.0.tgz</code>
<code class="language-javascript">$ mv kafka_2.11-2.3.0 /usr/local/kafka</code>
4. Start the zookeeper and kafka server
<code class="language-javascript">$ cd /usr/local/kafka</code>
<code class="language-javascript">$ bin/zookeeper-server-start.sh config/zookeeper.properties</code>
<code class="language-javascript">$ bin/kafka-server-start.sh config/server.properties</code>
5. Create a Kafka topic
<code class="language-javascript">bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1</code>
<code class="language-javascript"> --topic sampleTopic</code>
Configuring Adonis.js Application to use Kafka
Now that the setup for Kafka is ready, we will configure our Adonis application to use Kafka
Use the Kafka-node package for using Kafka with the adonis.js application
<code class="language-javascript">npm i kafka-node</code>
Since the Kafka consumer needs to be initialized at the start of an application to continuously listen for the messages produced by consumers, we will be creating a command in Adonis to start the consumer for listening.
Create the Command for starting the consumers with the following
Learn how to create commands in adonis.js(Creating Custom Commands with Adonis.js)
<code class="language-javascript">adonis make:command startConsumer</code>
Update the handle method of the StartConsumer.js with the following
<code class="language-javascript">async handle (args, options)</code>
<code class="language-javascript"> { Notification.consume_events()}</code>
Here, consume_events() is a static method that belongs to the Notification class defined as follows
<code class="language-javascript">const Model = use('Model')</code>
<code class="language-javascript">const Env = use('Env')</code>
<code class="language-javascript">const kafka = require('kafka-node')</code>
class Notification extends Model {
static consume_email_events (){
const Consumer = kafka.Consumer
const client = new kafka.KafkaClient(Env.get('KAFKA_SERVER'))
const consumer = new Consumer(client,[{ topic: 'sampleTopic'}],{autoCommit: false})
consumer.on('message', function (msg) {
console.log("inside consumer")
console.log(msg)
})
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})
}
}
module.exports = Notification
For a producer to publish the message, we need to add the following method at the required trigger, here we are using an after create hook of the user model to publish a message
UserHook.triggerNotifications = async (userInstance) => {
Notification.produce_event('sampleTopic','New user created')}
Here produce_event() is another static method added to the Notification class as follows -
static produce_event (topic,messages){
const Producer = kafka.Producer
const client = new kafka.KafkaClient(Env.get('KAFKA_SERVER'))
const producer = new Producer(client)
let payloads = [{
topic: topic,
messages: messages
}]
producer.on('ready', function () {
let push_status = producer.send(payloads, (err, data) => {
if (err) {
console.log('[kafka-producer -> '+topic+']: broker failed to update')
}
else {
console.log('[kafka-producer -> '+topic+']: broker updated successfully')
}
})
})
producer.on('error', function (err) {
console.log('Producer is in error state')
console.log(err)
})
}
To execute the command with the startup of an adonis application, configure the server.js file by preloading the startConsumer command. The Start consumer command is set inside the kafka.js defined in the start folder of the application as follows
const Consumer = use('Consumer')
Consumer.exec()
The consumer is defined inside the app.js as an alias
const aliases = {
Consumer: 'App/Commands/StartConsumer'}
and, the start consumer command is registered as
<code class="language-javascript">const commands = ['App/Commands/StartConsumer']</code>
Lastly, preload the Kafka with the start of the Adonis application
new Ignitor(require('@adonisjs/fold'))
.appRoot(__dirname)
.preLoad('start/kafka')
.fireHttpServer()
.catch(console.error)
As and when a user is created, the consumer would log the message
<code class="language-javascript">New user created</code>
Kafka plays a crucial role in backend development services by providing a robust and scalable messaging system that efficiently handles real-time data streams. Its publisher-subscriber model ensures seamless communication between microservices, making it an essential component in modern backend architectures.
Happy Coding! ????
In case of queries, please feel free to drop a comment.
Scrum Master Vs Project Manager Everything You Need To Know
Wearable Technology And Its Past Present And Future
Tips On How To Reduce Mobile App Development Cost
How To Create An Animated Svg Scaleable Vector Graphic Image For Your Website