This example demonstrates advanced AI data processing using Upstash Workflow. The following example workflow downloads a large dataset, processes it in chunks using OpenAI’s GPT-4 model, aggregates the results and generates a report.
import { serve } from "@upstash/workflow/nextjs"
import {
downloadData,
aggregateResults,
generateReport,
sendReport,
getDatasetUrl,
splitIntoChunks,
} from "./utils"
type OpenAiResponse = {
choices: {
message: {
role: string,
content: string
}
}[]
}
export const { POST } = serve<{ datasetId: string; userId: string }>(
async (context) => {
const request = context.requestPayload
const datasetUrl = await context.run("get-dataset-url", async () => {
return await getDatasetUrl(request.datasetId)
})
const dataset = await context.call("download-dataset", datasetUrl, "GET")
const chunkSize = 1000
const chunks = splitIntoChunks(dataset, chunkSize)
const processedChunks: string[] = []
for (let i = 0; i < chunks.length; i++) {
const processedChunk = await context.call<OpenAiResponse>(
`process-chunk-${i}`,
"https://api.openai.com/v1/chat/completions",
"POST",
{
model: "gpt-4",
messages: [
{
role: "system",
content:
"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
},
{
role: "user",
content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
},
],
max_tokens: 150,
},
{ authorization: `Bearer ${process.env.OPENAI_API_KEY}` }
)
processedChunks.push(processedChunk.choices[0].message.content)
if (i % 10 === 9 || i === chunks.length - 1) {
await context.run(`aggregate-results${i}`, async () => {
await aggregateResults(processedChunks)
processedChunks.length = 0
})
}
}
const report = await context.run("generate-report", async () => {
return await generateReport(request.datasetId)
})
await context.run("send-report", async () => {
await sendReport(report, request.userId)
})
}
)
const datasetUrl = await context.run("get-dataset-url", async () => {
return await getDatasetUrl(request.datasetId)
})
const dataset = await context.call("download-dataset", datasetUrl, "GET")
We split the dataset into chunks and process each one using OpenAI’s GPT-4 model:
for (let i = 0; i < chunks.length; i++) {
const processedChunk = await context.call(
`process-chunk-${i}`,
"https://api.openai.com/v1/chat/completions",
"POST",
{
model: "gpt-4",
messages: [
{
role: "system",
content:
"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
},
{
role: "user",
content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
},
],
max_tokens: 150,
},
{ authorization: `Bearer ${process.env.OPENAI_API_KEY}` }
)
processedChunks.push(processedChunk.choices[0].message.content)
}
After processing our data in smaller chunks to avoid any function timeouts, we aggregate results every 10 chunks:
if (i % 10 === 9 || i === chunks.length - 1) {
await context.run(`aggregate-results${i}`, async () => {
await aggregateResults(processedChunks)
processedChunks.length = 0
})
}
Finally, we generate a report based on the aggregated results and send it to the user:
const report = await context.run("generate-report", async () => {
return await generateReport(request.datasetId)
})
await context.run("send-report", async () => {
await sendReport(report, request.userId)
})