Apache Kafka – Real World Project with Twitter using Java
Apache Kafka is a publish-subscribe messaging system. A messaging system let you send messages between processes, applications, and servers. Apache Kafka is software where topics (A topic might be a category) can be defined and further processed. To know more about this, please refer to the article – What is Apache Kafka and How Does it Work?
In this article, we will demonstrate an Apache Kafka real-world project with Twitter using Java. We will develop a Kafka producer application in Java that will fetch data from the Twitter API data source and publish it to a Kafka Topic for a consumer application to subscribe and consume messages.
Step-by-Step Implementation
Step 1: Create a New Apache Kafka Project in IntelliJ
To create a new Apache Kafka Project in IntelliJ using Java and Maven please refer to How to Create an Apache Kafka Project in IntelliJ using Java and Maven.
Step 2: Install and Run Apache Kafka
To Install and Run Apache Kafka in your local system please refer to How to Install and Run Apache Kafka.
Step 3: Create a Twitter Developer Account & Generate Keys and Token
Go to this link and apply for a Twitter Developer Account & generate Consumer Keys and Authentication Tokens. Please refer to the below image.
Step 4: Add Dependencies to the pom.xml file
Add the following dependencies to your pom.xml file
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j --> <version>2.2.0</version> <!-- or whatever the latest version is --> </dependency> </dependencies>
Below is the complete code for the pom.xml file
XML
<? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</ modelVersion > < parent > < groupId >org.kafkademo</ groupId > < artifactId >kafka-demo</ artifactId > < version >1.0-SNAPSHOT</ version > </ parent > < artifactId >kafka-twitter</ artifactId > < properties > < maven.compiler.source >11</ maven.compiler.source > < maven.compiler.target >11</ maven.compiler.target > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > </ properties > < dependencies > <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> < dependency > < groupId >org.apache.kafka</ groupId > < artifactId >kafka-clients</ artifactId > < version >2.8.0</ version > </ dependency > < dependency > < groupId >com.twitter</ groupId > < artifactId >hbc-core</ artifactId > <!-- or hbc-twitter4j --> < version >2.2.0</ version > <!-- or whatever the latest version is --> </ dependency > </ dependencies > </ project > |
Step 5: Writing a Twitter Client
Declare all the generated Keys and Tokens. Refer to the Step 3.
private String consumerKey = "put your consumerKey here"; private String consumerSecret = "put your consumerSecret here"; private String token = "put your token here"; private String secret = "put your secret here";
Declare the terms for which you want to fetch data from the Twitter API.
List<String> topics = Lists.newArrayList("w3wiki", "java", "kafka");
Set up your blocking queues.
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000);
Below is the function for creating a Twitter client.
public Client createTwitterClient(BlockingQueue<String> msgQueue) { // Declare the host you want to connect to, // the endpoint, and authentication Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); hosebirdEndpoint.trackTerms(topics); // These secrets should be read from a config file Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret); ClientBuilder builder = new ClientBuilder() .name("Hosebird-Client-01") .hosts(hosebirdHosts) .authentication(hosebirdAuth) .endpoint(hosebirdEndpoint) .processor(new StringDelimitedProcessor(msgQueue)); Client hosebirdClient = builder.build(); return hosebirdClient; }
Step 6: Attempts to establish a connection
Below is the code for an attempt to establish a connection
twitterClient.connect();
Step 7: Writing the Kafka Producer
Create Producer Properties.
Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Create the Producer.
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Below is the function for creating the Kafka Producer.
public KafkaProducer<String, String> createKafkaProducer() { String bootstrapServer = "127.0.0.1:9092"; // Create Producer Properties Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Create the Producer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // Return the Producer return producer; }
Step 8: Loop to send tweets to Kafka on a different thread, or multiple different threads
Below is the code for sending tweets to Kafka on a different thread, or multiple different threads.
private static void sendTweetsToKafka(BlockingQueue<String> msgQueue, Client twitterClient, KafkaProducer<String, String> producer) { while (!twitterClient.isDone()) { String msg = null; try { msg = msgQueue.poll(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); twitterClient.stop(); } if (msg != null) { producer.send(new ProducerRecord<>("twitter_tweets", null, msg), (recordMetadata, e) -> { if (e != null) { System.out.println(e.getMessage()); } }); } } }
Below is the complete code for Apache Kafka Real-World Project with Twitter using Java.
Java
package twitter; import com.google.common.collect.Lists; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Constants; import com.twitter.hbc.core.Hosts; import com.twitter.hbc.core.HttpHosts; import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.List; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class TwitterProducer { private String consumerKey = "put your consumerKey here" ; private String consumerSecret = "put your consumerSecret here" ; private String token = "put your token here" ; private String secret = "put your secret here" ; List<String> topics = Lists.newArrayList( "w3wiki" , "java" , "kafka" ); public static void main(String[] args) { new TwitterProducer().run(); } public TwitterProducer() { } public void run() { // Set up your blocking queues BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>( 1000 ); // Create Twitter Client Client twitterClient = createTwitterClient(msgQueue); // Attempts to establish a connection twitterClient.connect(); // Create Kafka Producer KafkaProducer<String, String> producer = createKafkaProducer(); // Loop to send tweets to kafka // on a different thread, or // multiple different threads sendTweetsToKafka(msgQueue, twitterClient, producer); } private static void sendTweetsToKafka(BlockingQueue<String> msgQueue, Client twitterClient, KafkaProducer<String, String> producer) { while (!twitterClient.isDone()) { String msg = null ; try { msg = msgQueue.poll( 5 , TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); twitterClient.stop(); } if (msg != null ) { producer.send( new ProducerRecord<>( "twitter_tweets" , null , msg), (recordMetadata, e) -> { if (e != null ) { System.out.println(e.getMessage()); } }); } } } public Client createTwitterClient(BlockingQueue<String> msgQueue) { // Declare the host you want to connect to, // the endpoint, and authentication Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); hosebirdEndpoint.trackTerms(topics); // These secrets should be read from a config file Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret); ClientBuilder builder = new ClientBuilder() .name( "Hosebird-Client-01" ) .hosts(hosebirdHosts) .authentication(hosebirdAuth) .endpoint(hosebirdEndpoint) .processor( new StringDelimitedProcessor(msgQueue)); Client hosebirdClient = builder.build(); return hosebirdClient; } public KafkaProducer<String, String> createKafkaProducer() { String bootstrapServer = "127.0.0.1:9092" ; // Create Producer Properties Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class .getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class .getName()); // Create the Producer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // Return the Producer return producer; } } |
Output:
Contact Us