Apache Flink is a distributed processing engine which can process streaming data.

Upstash Kafka Setup

Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.

Create two topics by following the creating topic steps. Let’s name first topic “input”, since we are going to stream this topic to other one, which we can name it as “output”.

Project Setup

If you already have a project and want to implement Upstash Kafka and Apache Flink integration into it, you can skip this section and continue with Add Apache Flink and Kafka into the Project.

Install Maven to your machine by following Maven Installation Guide.

Run mvn –version in a terminal or in a command prompt to make sure you have Maven downloaded.

It should print out the version of the Maven you have:

Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"

To create the Maven project;

Go into the folder that you want to create the project in your terminal or command prompt by running cd <folder path>

Run the following command:

mvn archetype:generate -DgroupId=com.kafkaflinkinteg.app -DartifactId=kafkaflinkinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

Open the project folder by using an IDE which has maven plugin such as Intellij, Visual Studio, Eclipse etc. Add following Apache Flink dependencies into the dependencies tag in pom.xml file.

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka</artifactId>
 <version>1.16.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-base</artifactId>
 <version>1.16.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java</artifactId>
 <version>1.16.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients</artifactId>
 <version>1.16.0</version>
</dependency>

Streaming From One Topic to Another Topic

You need to create 2 more classes (LineSplitter, CustomSerializationSchema) for word count example.

LineSplitter

This class will be custom implementation of FlatMapFunction from Apache Flink client library. It takes a sentence, splits into words and returns a two-dimensional Tuple in format: (<word>, 1).

Create LineSplitter class as following.

package com.kafkaflinkinteg.app;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
 */
public class LineSplitter  implements FlatMapFunction<String, Tuple2<String, Integer>> {

	@Override
	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
		// normalize and split the line
		String[] tokens = value.toLowerCase().split("\\W+");

		// emit the pairs
		for (String token : tokens) {
	        if (token.length() > 0) {
	            out.collect(new Tuple2<String, Integer>(token, 1));
	        }
        }
    }
}

CustomSerializationSchema

This class will be a custom implementation of KafkaRecordSerializationSchema from Apache Flink Kafka connector library. It will provide a schema for serializing and converting data from two-dimensional Tuple, which will be the output of word counting process, to Kafka record format.

Create CustomSerializationSchema class as following:

package com.kafkaflinkinteg.app;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;

public class CustomSerializationSchema<T> implements KafkaRecordSerializationSchema<Tuple2<String, Integer>> {
	private String topic;
	private ObjectMapper mapper;

	public CustomSerializationSchema(String topic) {
		this.topic = topic;
	}

    @Override
    public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
	    KafkaRecordSerializationSchema.super.open(context, sinkContext);
	}

    @Override
	public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> stringIntegerTuple2, KafkaSinkContext kafkaSinkContext, Long aLong) {
        byte[] k = null;
        byte[] v = null;
        if (mapper == null) {
	        mapper = new ObjectMapper();
	    }
        try {
	        k = mapper.writeValueAsBytes(stringIntegerTuple2.f0);
	        v = mapper.writeValueAsBytes(stringIntegerTuple2.f1);
	    } catch ( JsonProcessingException e) {
		    // error
		}
        return new ProducerRecord<>(topic, k,v);
	}
}

Integration

Import the following packages first:

package com.kafkaflinkinteg.app;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Properties;

Define the names of the topics you are going to work on:

String inputTopic = "input";
String outputTopic = "output";

Create the following properties for Apache Flink Kafka connector and replace UPSTASH-KAFKA-* placeholders with your cluster information.

Properties props = new Properties();
props.put("transaction.timeout.ms", "90000"); // e.g., 2 hours
props.put("bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";");

Get the stream execution environment to create and execute the pipeline in it.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Create the Kafka consumer.

KafkaSource<String> source = KafkaSource.<String>builder()
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setProperties(props)
        .setTopics(inputTopic)
        .setGroupId("my-group")
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();

Implement the stream processing part, which will take the input sentence from source and count words.

DataStream<Tuple2<String, Integer>> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
        .flatMap(new LineSplitter())
        .keyBy(value -> value.f0)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .sum(1);

You can see the output by printing the data stream.

stream.print();

If you produce message to the input topic from your console, you will see the output like this:

2> (This,1)
1> (an,1)
3> (is,1)
2> (sentence,1)
4> (example,1)

Next, create a Kafka producer to sink the data stream to output Kafka topic.

KafkaSink sink = KafkaSink.<String>builder()
        .setKafkaProducerConfig(props)
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .setTransactionalIdPrefix("integ")
        .setRecordSerializer(new CustomSerializationSchema(outputTopic))
        .build();
stream.sinkTo(sink);

Finally, execute the Stream execution environment that was retrieved and run it.

env.execute();