Introduction

This example demonstrates advanced AI data processing using Upstash Workflow. The following example workflow downloads a large dataset, processes it in chunks using OpenAI’s GPT-4 model, aggregates the results and generates a report.

Use Case

Our workflow will:

  1. Receive a request to process a dataset
  2. Download the dataset from a remote source
  3. Process the data in chunks using OpenAI
  4. Aggregate results
  5. Generate and send a final report

Code Example

api/workflow/route.ts
import { serve } from "@upstash/qstash/nextjs"
import {
  downloadData,
  aggregateResults,
  generateReport,
  sendReport,
  getDatasetUrl,
  splitIntoChunks,
} from "./utils"

type OpenAiResponse = {
  choices: {
    message: {
      role: string,
      content: string
    }
  }[]
}

export const POST = serve<{ datasetId: string; userId: string }>(
  async (context) => {
    const request = context.requestPayload

    // Step 1: Download the dataset
    const datasetUrl = await context.run("get-dataset-url", async () => {
      return await getDatasetUrl(request.datasetId)
    })

    // HTTP request with much longer timeout (2hrs)
    const dataset = await context.call("download-dataset", datasetUrl, "GET")

    // Step 2: Process data in chunks using OpenAI
    const chunkSize = 1000
    const chunks = splitIntoChunks(dataset, chunkSize)
    const processedChunks: string[] = []

    for (let i = 0; i < chunks.length; i++) {
      const processedChunk = await context.call<OpenAiResponse>(
        `process-chunk-${i}`,
        "https://api.openai.com/v1/chat/completions",
        "POST",
        {
          model: "gpt-4",
          messages: [
            {
              role: "system",
              content:
                "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
            },
            {
              role: "user",
              content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
            },
          ],
          max_tokens: 150,
        },
        { authorization: `Bearer ${process.env.OPENAI_API_KEY}` }
      )

      processedChunks.push(processedChunk.choices[0].message.content)

      // Every 10 chunks, we'll aggregate intermediate results
      if (i % 10 === 9 || i === chunks.length - 1) {
        await context.run(`aggregate-results${i}`, async () => {
          await aggregateResults(processedChunks)
          processedChunks.length = 0
        })
      }
    }

    // Step 3: Generate and send data report
    const report = await context.run("generate-report", async () => {
      return await generateReport(request.datasetId)
    })

    await context.run("send-report", async () => {
      await sendReport(report, request.userId)
    })
  }
)

Code Breakdown

1. Preparing our data

We start by retrieving the dataset URL and then downloading the dataset:

api/workflow/route.ts
const datasetUrl = await context.run("get-dataset-url", async () => {
  return await getDatasetUrl(request.datasetId)
})

const dataset = await context.call("download-dataset", datasetUrl, "GET")

Note that we use context-call for the download, a way to make HTTP requests that run for much longer than your serverless execution limit would normally allow.

2. Processing our data

We split the dataset into chunks and process each one using OpenAI’s GPT-4 model:

api/workflow/route.ts
for (let i = 0; i < chunks.length; i++) {
  const processedChunk = await context.call(
    `process-chunk-${i}`,
    "https://api.openai.com/v1/chat/completions",
    "POST",
    {
      model: "gpt-4",
      messages: [
        {
          role: "system",
          content:
            "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
        },
        {
          role: "user",
          content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
        },
      ],
      max_tokens: 150,
    },
    { authorization: `Bearer ${process.env.OPENAI_API_KEY}` }
  )

  processedChunks.push(processedChunk.choices[0].message.content)
}

3. Aggregating our data

After processing our data in smaller chunks to avoid any function timeouts, we aggregate results every 10 chunks:

api/workflow/route.ts
if (i % 10 === 9 || i === chunks.length - 1) {
  await context.run(`aggregate-results${i}`, async () => {
    await aggregateResults(processedChunks)
    processedChunks.length = 0
  })
}

4. Sending a report

Finally, we generate a report based on the aggregated results and send it to the user:

api/workflow/route.ts
const report = await context.run("generate-report", async () => {
  return await generateReport(request.datasetId)
})

await context.run("send-report", async () => {
  await sendReport(report, request.userId)
})

Key Features

  1. Non-blocking HTTP Calls: We use context.call for API requests so they don’t consume the endpoint’s execution time (great for optimizing serverless cost).

  2. Long-running tasks: The dataset download can take up to 2 hours, though is realistically limited by function memory.