How to Connect Spark Streaming with Kafka in Python



Welcome to Apache Spark Streaming world, in this post I am going to share the integration of Spark Streaming Context with Apache Kafka.

Spark Streaming With Kafka Python Overview:

Apache Kafka:

Apache Kafka is a popular publish subscribe messaging system which is used in various oragnisations. It is similar to message queue or enterprise messaging system.

Spark Streaming:

Spark Streaming is the extension of core Spark API and it is useful that is useful in processing live streams of data. Spark Streaming is very fast, it can process millions of records within few second which makes its most popular tool in processing of live streams.

How To Connect Apache Kafka With Apache Spark Streaming

Aim: Our aim is to connect Spark with Kafka. We want to send messages from Kafka and wants to read those messages from Apache.

PreRequisite: You should have Kafka and Spark installed in your system.

Solution:

Part A: We need to start sending messages through Kafka Producer which will

Step1:

Install Kafka in your system from here:

https://kafka.apache.org/downloads

Step2:

Go to bin folder. Use this command to create topic. (Topic is the way through which you can send the message to Spark Streaming)

./kafka-topics.sh –create –zookeeper servername01:2181 –topic  test_topic1 (you should have zookeeper running in your system)

Step3:

Use this command to start Kafka Producer

./kafka-console-producer.sh –broker-list servername02:9092 –topic test_topic

Step4:

Command prompt will get open, start typing your message in it

 

Part B: Spark Streaming will receive messages sent by Kafka Producer. We will do it by python code.

Step1:

Now open another window and create a python file (spark_kafka.py) to write code into it.

You need to import some libraries

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming import KafkaUtils

Step2:

Create a sparkContext  and streamingcontext with batch interval of 5 seconds and three working threads:

sc = SparkContext(“local[3]”, “NetworkWordCount”)

sc.setLogLevel(“WARN”) #setting loglevel = WARN to remove unwanted info from commandLine

ssc = StreamingContext( sc, 5)

Step3:

Create a kafkaStream using Kafka Utililty:

kafkaStream = KafkaUtils.createStream(ssc, “servername:2181”, “c-group”, {topic_name:1})

Step4:

To print the received messages from Kafka Producer

kafkaStream.pprint()

 

Final Code:

Here is the Final Python Code

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming import KafkaUtils

sc = SparkContext(“local[3]”, “NetworkWordCount”)

sc.setLogLevel(“WARN”) #setting loglevel = WARN to remove unwanted info from commandLine

ssc = StreamingContext( sc, 5)

kafkaStream = KafkaUtils.createStream(ssc, “servername:2181”, “c-group”, {topic_name:1})

kafkaStream.pprint()

 

How To Run It:

Step 1:

Start kafka producer as told in Part A

Step2:

Download jar file and place it in your system.

Download Path : https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-assembly_2.10/1.3.0

Step3:

Go to spark bin folder and type this command over there:

Spark-submit –jars /Path/to/jar/file/spark-streaming-kafka-assembly.jar   spark_kafka.py

 

Conclusion:

You will receive the messages whichever you type in Producer Window. Message will be displayed in Spark Streaming window.

Feel free to ask your question in comment box if you are facing any issue. We are here to help you!

Be the first to comment

Leave a Reply

Your email address will not be published.


*