Skip to main content
Contact our team to know more about our services
select webform
By submitting, you acknowledge that you've read and agree to our privacy policies, and Opcito may use the information provided for business purposes.
Become a part of our team
select webform
One file only.
1.5 GB limit.
Allowed types: gif, jpg, jpeg, png, bmp, eps, tif, pict, psd, txt, rtf, html, odf, pdf, doc, docx, ppt, pptx, xls, xlsx, xml, avi, mov, mp3, mp4, ogg, wav, bz2, dmg, gz, jar, rar, sit, svg, tar, zip.
By submitting, you acknowledge that you've read and agree to our privacy policies, and Opcito may use the information provided for business purposes.
Building a real-time data pipeline using Spark Streaming and Kafka
21 Jun 2018

Building a real-time data pipeline using Spark Streaming and Kafka

In one of our previous blogs, Aashish gave us a high-level overview of data ingestion with Hadoop Yarn, Spark, and Kafka. Now it's time to take a plunge and delve deeper into the process of building a real-time data ingestion pipeline. To do that, there are multiple technologies that you can use to write your own Spark streaming applications, like Java, Python, Scala, and so on. In this blog, I will explain how you can build one using Python.

This solution typically finds application in scenarios where businesses are struggling to make sense of the data collected over customer habits and preferences to be able to make smarter business decisions. The problem with the traditional approach of processing the data (before analyzing it) in a sequential manner is the bottleneck which takes time typically up to weeks. But with the approach which I am about to explain you can bring down this time significantly.

Before going through the modules of the solution, let’s take a quick look at the tools that I am going to use in this process:

  • Spark: Apache Spark is an open source and flexible in-memory framework which serves as an alternative to map-reduce for handling batch, real-time analytics, and data processing workloads. It provides native bindings for the Java, Scala, Python, and R programming languages and supports SQL, streaming data, machine learning, and graph processing. I have used Spark in the solution which I am about to explain, for improving the processing time.
  • Kafka: Apache Kafka is an open source distributed streaming platform which is useful in building real-time data pipelines and stream processing applications. I have used Kafka for internal communication between the different streaming jobs.
  • HBase: Apache HBase is an Open source distributed column-oriented NoSQL database that runs on top of the Hadoop Distributed File System (HDFS). It is natively integrated with the Hadoop ecosystem and is designed to provide quick random access to huge amounts of structured data. I have used HBase in the final stage of the solution, which I have implemented for one of our clients to store the processed data.

The workflow of the solution has four main stages, which include Transformation, Cleaning, Validation, and Writing of the data received from the various sources.

Data Transformation:

This is an entry point for the streaming application. This module/application performs the operations related to the normalization of data and helps in converting the different keys and values received from various sources to respective associated forms. In Spark streaming, the transformation of data can be performed by using built-in functions like map, filter, foreachRDD, etc.

def get_transformation(tup_data):
    for tup in tup_data:
        Custom logic to iterate on your data for transformation purpose.
    return data

transformed_data = spark_kafka_stream.foreachRDD(lambda rdd: rdd.foreachPartition(get_transformation))
Here transformed_data is of RDD type.

Data Cleaning:

When you are preprocessing data, cleaning is an important stage. I have written a separate streaming application for data cleaning. In this stage, I have used some custom-built libraries in Python to clean the garbage data.

Example to clean multiple names, remove 'and' or '&' characters, remove hyphen in name.

def clean_multiple_name(data):

    first_name = data.split('&')[0]

        first_name = data.split(" and ")[0]

    first_name = re.sub(r"[^a-z]", '', first_name, flags=re.IGNORECASE)

    first_name = str(first_name).replace('-', '')

Data Validation:

We have written a streaming application for data validation. In this stage, you can check against the various data fields and validate whether the data procured matches the validations like string length, presence of dis-allowed special characters and so on.

  Check for validations like amount should be greater than 0

     if data['amount'] > 0 :

    return True

Writing:
Finally, data passed from the above applications is passed on to the writing application, which simply writes this final set of data to HBase for further data analysis.

So, your data pipeline starts from the transformation stage and ends at the writing stage application. Communication between all these applications is managed by Kafka, using different topics for different applications.

Pseudo code for a streaming job in PySpar

Step 1: Initialize sparkcontext

spark_context = SparkContext(appName="Transformation Application")

This is the main entry point to the Spark functionality, which allows you to connect to the Spark cluster and create RDD. appName is the name of the Spark streaming application.

Step 2: Initialize streaming context

streaming_spark_context = StreamingContext(spark_context, 5)

This is the entry point to the Spark streaming functionality which is used to create Dstream from various input sources. In this case, I am getting records from Kafka. The value ‘5’ is the batch interval. Spark streaming is a micro-batch based streaming library. This ensures that the streaming data is divided into batches based on time slice. Every batch gets converted into RDD, and the continuous stream of RDD is called Dstream. It is always recommended to have a low interval value of the batch size to get good processing performance.

Step 3: Fetch data from Kafka topics:

direct_stream = KafkaUtils.createDirectStream(streaming_spark_context, [list of topics comma separated], {"metadata.broker.list": kafka_host})

It is used to connect the Kafka broker for fetching the messages from Kafka. This receiver-less approach promises a strong end-to-end guarantee. In this approach, it periodically fetches Kafka's latest offset in each topic to process messages. With this approach, there is no need to create multiple input Kafka streams and unite them with DirectStream.

Step 4: Transformation

direct_stream.map(custom_defined_functions)

Note: There are multiple other methods like filter, foreach, and foreachRDD that you can use for transformation.

Step 5: Managing the Pipeline

streaming_spark_context.start()

streaming_spark_context.awaitTermination()

Performance tuning with Spark and Kafka:

The following configurations can help you to improve the pipeline’s performance:

spark.executor.memory: This parameter defines the amount of memory to be assigned for use per executor process which is in the same format as the JVM memory strings. It varies from application to application on the basis of memory required for every job and is specified in GB

spark.executor.instances: This parameter defines the number of executors. This property creates conflicts with the condition: spark.dynamicAllocation.enabled. If both the parameters are specified, the specified number of spark.executor.instances are used instead of the ones assigned by dynamic allocation flow. So, suppose a node has 32GB of RAM, 8GB is reserved for the OS and each executor is expected to consume 3GB of memory, the total no. of containers launched will be 24/3=8.

spark.streaming.receiver.maxRate: This parameter defines the maximum number of records per second that each receiver will receive. Effectively, each stream will consume a maximum of these number of records per second. Setting this configuration to a 0 or a negative value will cause this value to be set as infinity

spark.streaming.kafka.maxRatePerPartition: This parameter defines the maximum number of records per second that will be read from each Kafka partition when using the new Kafka DirectStream API. I had set the batch size to 100 for the use case and that worked for me.

At Opcito, we strive to build the most optimal solutions for our clientele using the best tools out there - like the ones mentioned above. So, if you are facing issues with analyzing your data, you know what solution to implement. Happy coding :)
 

Subscribe to our feed

select webform