Step-By-Step Implementation of Read Data from the Beginning Using Kafka Consumer API
Below are the steps to implement reading data from the beginning using Kafka Consumer API.
Step 1: Maven dependency
First, let’s add the Maven dependency for the Kafka Clients Java library to the pom.xml file in our project:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
Step 2: Publish Messages from the Beginning
To publish messages, let’s first establish a KafkaProducer instance with a minimal setup specified by a Properties instance:
Java
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerFactory { public static Producer<String, String> createProducer(String bootstrapServers) { // create Kafka producer properties Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class .getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class .getName()); // create and return the Kafka producer return new KafkaProducer<>(producerProperties); } } |
Step 3: Use KafkaProducer.send method
To publish messages to the Kafka stream, we utilize the KafkaProducer.send(ProducerRecord) function.
Java
// creating a loop to send 10 messages to a Kafka topic named "w3wiki" for ( int i = 1 ; i <= 10 ; i++) { // creating a new ProducerRecord with the topic "w3wiki" and a string value based on the loop variable ProducerRecord<String, String> record = new ProducerRecord<>( "w3wiki" , String.valueOf(i)); // sending the record to Kafka using the previously configured KafkaProducer producer.send(record); } |
Step 4: Consume messages from the beginning of a Kafka Consumer API
Using a randomly created consumer group id, we build an instance of KafkaConsumer to consume messages from the start of a Kafka topic. To do this, we assign a randomly generated UUID to the consumer’s “group.id” property:
Java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; import java.util.UUID; public class KafkaConsumerProperties { public static Properties createConsumerProperties(String bootstrapServers) { Properties properties = new Properties(); // setting up Kafka consumer properties properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class .getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class .getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, generateRandomGroupId()); return properties; } private static String generateRandomGroupId() { // generating a random Group ID using UUID return UUID.randomUUID().toString(); } } |
- The customer will always be a member of a new consumer group that is designated by the group.id property whenever we create a new consumer group id for them.
- A newly formed consumer group will not be connected to any offset.
- In these situations, Kafka has a setting called auto.offset.reset that specifies what should happen if the current offset on the server disappears or if there is no starting offset in Kafka.
Step 5: Use the KafkaConsumer.poll(Duration duration) method
The KafkaConsumer.poll(Duration duration) function is then used to poll for new messages.
Java
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Properties; public class KafkaConsumerExample { private static final String BOOTSTRAP_SERVERS = "localhost:9092" ; public static void main(String[] args) { // create Kafka consumer properties Properties consumerProperties = KafkaConsumerProperties.createConsumerProperties(BOOTSTRAP_SERVERS); // create Kafka consumer Consumer<String, String> consumer = KafkaConsumerFactory.createConsumer(consumerProperties); // poll for records ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds( 10 )); // process the received records processRecords(records); // close the consumer consumer.close(); } private static void processRecords(ConsumerRecords<String, String> records) { // log the values of the received records for (ConsumerRecord<String, String> record : records) { Logger.info(record.value()); } } } |
Step 6: Reset the existing consumer to read
Using KafkaConsumer.seekToBeginning(Collection<TopicPartition> partitions), we can force the current consumer to start reading from the beginning of the topic. This function takes in a collection of TopicPartition and uses the offset of the consumer to refer to the partition’s start:
consumer.seekToBeginning(consumer.assignment());
- We provide the seekToBeginning() function the value of KafkaConsumer.assignment().
- The set of partitions that the consumer is presently allocated is returned by the KafkaConsumer.assignment() function.
Step 7: Read all the messages from the beginning
When the same customer is surveyed once again for messages, all of the messages from the start of the partition are now read.
Java
// polling for records from the Kafka topic for a duration of 10 seconds ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds( 10 )); // iterating through the received records and logging the values for (ConsumerRecord<String, String> record : records) { // logging the value of each record using a logger logger.info(record.value()); } |
Read Data From the Beginning Using Kafka Consumer API
Apache Kafka is a distributed, fault-tolerant stream processing system. Data is read from Kafka and output to standard output using the kafka-console-consumer CLI. To operate, the Kafka consumer sends fetch requests to the brokers in charge of the partitions it wishes to consume. With every request, the consumer offset is recorded in the log. A portion of the log is returned to the customer starting at the offset location. This position is mostly controlled by the user, who may even rewind it to re-consume data if they so want.
Contact Us