In this tutorial, we will produce events to Upstash Kafka from a Cloudflare Workers function.

Create Kafka

First, create an Upstash Kafka cluster and topic following those steps. You will need the endpoint, username and password in the following steps.

Create Project

We will use Wrangler to create the application. After installing and configuring wrangler, create a folder for your project inside the folder run: wrangler init

It will create wrangler.toml. Paste your account id to the toml which is logged by wrangler.

Copy and paste the Upstash Kafka URL, topic name, username and password to the toml.

name = "produce-in-cloudflare-workers"
type = 'webpack'
account_id = 'REPLACE_HERE'
route = ''
zone_id = ''
usage_model = ''
workers_dev = true
target_type = "webpack"

[vars]
UPSTASH_KAFKA_REST_URL = "REPLACE_HERE"
UPSTASH_KAFKA_REST_USERNAME = "REPLACE_HERE"
UPSTASH_KAFKA_REST_PASSWORD = "REPLACE_HERE"

Implement the Function

Init a node project and install @upstash/kafka:

npm init
npm install @upstash/kafka

Add the index.js as below:

import { Kafka } from "@upstash/kafka";

addEventListener("fetch", (event) => {
  event.respondWith(handleRequest(event.request));
});

async function handleRequest(request) {
  console.log("START", request);

  const kafka = new Kafka({
    url: UPSTASH_KAFKA_REST_URL,
    username: UPSTASH_KAFKA_REST_USERNAME,
    password: UPSTASH_KAFKA_REST_PASSWORD,
  });

  const { pathname } = new URL(request.url);
  if (pathname.startsWith("/favicon")) {
    return fetch(request);
  }
  const p = kafka.producer();
  const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
  const response = await p.produce("mytopic", message);

  return new Response(JSON.stringify(response));
}

The above code simply sends the message to Kafka. If your message is more complicated then you can send it in the request body as explained here.

Run and Deploy the Function

Run the function locally: wrangler dev

Deploy your function to Cloudflare by running:

wrangler publish

This command will output your URL. The output of the URL should be something like this:

{
  "topic": "newtopic",
  "partition": 0,
  "offset": 278,
  "timestamp": 1640728294879
}

Test the Function

Now let’s validate that the messages are pushed to Kafka. We can consume the Kafka topic using the REST API. You can copy the curl code to consume from the Upstash Console.

produce-in-lambda git:(master) ✗ curl https://full-mantis-14289-us1-rest-kafka.upstash.io/consume/GROUP_NAME/GROUP_INSTANCE_NAME/mytopic -u REPLACE_USER_NAME:REPLACE_PASSWORD

[ {
  "topic" : "newtopic",
  "partition" : 0,
  "offset" : 282,
  "timestamp" : 1639610767445,
  "key" : "",
  "value" : "hello",
  "headers" : [ ]
} ]%

upstash-kafka vs other Kafka Clients

Upstash also supports native Kafka clients (e.g. KafkaJS). But Cloudflare Workers runtime does not allow TCP connections. upstash-kafka is HTTP based. That’s why we use upstash-kafka in our Cloudflare examples.