Upstash uses Apache Kafka for deployments and
provides a serverless Kafka cluster access using both native Kafka clients (over
TCP) and REST API (over HTTP). As a consequence of this flexible model, there
are some restrictions when using
Kafka protocol, mainly for administrative
Kafka APIs.
Currently following Kafka Protocol APIs are
supported by Upstash:
Produce | 0 | DescribeGroups | 15 | EndTxn | 26 |
Fetch | 1 | ListGroups | 16 | TxnOffsetCommit | 28 |
ListOffsets | 2 | SaslHandshake | 17 | DescribeConfigs | 32 |
Metadata | 3 | ApiVersions | 18 | AlterConfigs | 33 |
OffsetCommit | 8 | CreateTopics | 19 | DescribeLogDirs | 35 |
OffsetFetch | 9 | DeleteTopics | 20 | SaslAuthenticate | 36 |
FindCoordinator | 10 | DeleteRecords | 21 | CreatePartitions | 37 |
JoinGroup | 11 | InitProducerId | 22 | DeleteGroups | 42 |
Heartbeat | 12 | OffsetForLeaderEpoch | 23 | IncrementalAlterConfigs | 44 |
LeaveGroup | 13 | AddPartitionsToTxn | 24 | OffsetDelete | 47 |
SyncGroup | 14 | AddOffsetsToTxn | 25 | DescribeCluster | 60 |
Some of the unsupported Kafka APIs are in our roadmap to make them available.
If you need an API that we do not support at the moment, please drop a note to So we can inform you when
we are planning to support it.
Connect Using Kafka Clients
Connecting to Upstash Kafka using any Kafka client is very straightforward. If
you do not have a Kafka cluster and/or topic already, follow
these steps to create one.
After creating a cluster and a topic, just go to cluster details page on the
Upstash Console and copy bootstrap endpoint,
username and password.
Then replace following parameters in the code snippets of your favourite Kafka
client or language below.
Create a Topic
class CreateTopic {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", " required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
try (var admin = Admin.create(props)) {
Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
Produce a Message
class Produce {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", " required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (var producer = new KafkaProducer<String, String>(props)) {
producer.send(new ProducerRecord<>("{{ TOPIC_NAME }}", "Hello Upstash!"));
Consume Messages
class Consume {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", " required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("", "{{ GROUP_NAME }}");
try(var consumer = new KafkaConsumer<String, String>(props)) {
consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
var records = consumer.poll(Duration.ofSeconds(10));
for (var record : records) {