Spring Boot | How to consume string messages using Apache Kafka
Apache Kafka is a publish-subscribe messaging queue used for real-time streams of data. A messaging queue lets you send messages between processes, applications, and servers. In this article we will see how to send string messages from apache kafka to the console of a spring boot application.
Approach:
Step 1: Go to spring initializr and create a starter project with following dependency:
- Spring for Apache Kafka
Note: We can also create a maven project and add the following code to pom.xml file.
Xml
< dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter</ artifactId > </ dependency > < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > < scope >test</ scope > < exclusions > < exclusion > < groupId >org.junit.vintage</ groupId > < artifactId >junit-vintage-engine</ artifactId > </ exclusion > </ exclusions > </ dependency > < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka-test</ artifactId > < scope >test</ scope > </ dependency > </ dependencies > < build > < plugins > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > </ plugin > </ plugins > </ build > |
Step 2: Open the project in an IDE and sync the dependencies. Now create a new class Config and add annotations @Configuration and @EnableKafka.
Step 3: Now create beans ConsumerFactory and ConcurrentKafkaListenerContainerFactory with String object.
Java
// Java program to consume string // messages using spring kafka @EnableKafka @Configuration public class Config { // Function to establish // connection between spring // application and kafka server @Bean public ConsumerFactory<String, String> consumerFactory() { // HashMap to store the configurations Map<String, Object> map = new HashMap<>(); // put the host IP in the map map.put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092" ); // put the group ID in the map map.put(ConsumerConfig .GROUP_ID_CONFIG, "id" ); map.put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); map.put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); return new DefaultKafkaConsumerFactory<>(map); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListner() { ConcurrentKafkaListenerContainerFactory<String, String> obj = new ConcurrentKafkaListenerContainerFactory<>(); obj.setConsumerFactory(consumerFactory()); return obj; } } |
Step 4: Create a class KafkaService with @Service annotation. This class will contain the listener method to publish the message on the console.
Java
@Service public class KafkaService { // Annotation required to listen the // message from kafka server @KafkaListener (topics = "StringProducer" , groupId = "id" ) public void publish(String message) { System.out.println( "You have a new message: " + message); } } |
Step 5: Start zookeeper and then kafka server using the command below.
For windows:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
For mac and linux:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Step 6: Now we need to create a new topic with the name StringProducer. To do so, open a new command prompt window and change directory to the kafka directory. Create a new topic using the command given below:
// For Mac and Linux
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
// For Windows
.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
Step 7: Now to run kafka producer console, use the command below:
// For Mac and Linux
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example
// For Windows
.\bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example
Step 7: Run the application and type message on kafka producer and press enter.
Output:
Here type a message in string format on kafka producer
>Hello >Welcome to w3wiki
Output:
Contact Us