Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

Upstash Kafka Setup

Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.

Create a topic by following the creating topic steps. Let’s name the topic “sentence”.

Project Setup

If you already have a project and want to implement Upstash Kafka and Apache Spark integration into it, you can skip this section and continue with Add Spark and Kafka into the Project.

Install Maven to your machine by following Maven Installation Guide.

Run mvn –version in a terminal or in a command prompt to make sure you have Maven downloaded.

It should print out the version of the Maven you have:

Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"

To create the Maven project;

Go into the folder that you want to create the project in your terminal or command prompt by running cd <folder path>

Run the following command:

mvn archetype:generate -DgroupId=com.kafkasparkinteg.app -DartifactId=kafkasparkinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

Add Spark and Kafka into the Project

Open the project folder by using an IDE which has maven plugin such as Intellij, Visual Studio, Eclipse etc. Add following Spark dependencies into the dependencies tag in pom.xml file.

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>

Using Apache Spark as Producer

Import the following packages first:

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.*;

To send messages to Kafka from Spark, use the following code after replacing the UPSTASH-KAFKA-* placeholders with your cluster information:

SparkSession spark = SparkSession.builder()
  .appName("quickstart")
  .config("spark.master", "local")
  .getOrCreate();

StructType structType = new StructType();
structType = structType.add("key", DataTypes.StringType, false);
structType = structType.add("value", DataTypes.StringType, false);

List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create("test key", "This is an example sentence"));

Dataset<Row> sentenceDF = spark.createDataFrame(rows, structType);

sentenceDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";")
.option("topic", "sentence")
.save();

Before running the project, open the messages of the topic from console.

You can observe new message coming to the topic on Upstash console when you run your project.

Using Apache Spark as Consumer

If the following packages are not imported, import them first:

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.*;

To receive the messages from Kafka topic by Apache Spark and to process, use the following code after replacing the UPSTASH-KAFKA-* placeholders with your cluster information:

SparkSession spark = SparkSession.builder()
  .appName("quickstart")
  .config("spark.master", "local")
  .getOrCreate();

Dataset<Row> lines = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092")
  .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";")
  .option("startingOffsets", "earliest")
  .option("subscribe", "sentence")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// PROCESS RECEIVED MESSAGE - Word counting part
Dataset<String> words = lines.select("value")
  .as(Encoders.STRING())
  .flatMap( new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); }
  }, Encoders.STRING()); Dataset<Row> wordCounts = words.groupBy("value").count(); wordCounts.show();

You can verify that you can see the sentence, which you sent, on your console with number of word occurrences:

+--------+-----+
|   value|count|
+--------+-----+
| example|    1|
|      is|    1|
|sentence|    1|
|      an|    1|
|    This|    1|
+--------+-----+