world.streams
Read, write, and manage real-time data streams for workflow runs.
The world.streams interface (via the Streamer interface) provides low-level access to workflow data streams. Use it to write chunks, read streams, and manage stream lifecycle outside of the standard getWritable() pattern.
For most streaming use cases, use getWritable() inside steps. The world.streams interface is for advanced scenarios like building custom stream consumers or managing streams from outside a workflow.
Import
import { getWorld } from "workflow/runtime";
const world = getWorld();
const streams = world.streams; Methods
writeToStream()
Write data chunks to a named stream for a workflow run.
await world.streams.writeToStream("default", runId, chunk); Parameters:
| Parameter | Type | Description |
|---|---|---|
name | string | The stream name |
runId | string | The workflow run ID |
chunk | string | Uint8Array | Data to write to the stream |
readFromStream()
Read data from a named stream as a ReadableStream.
const readable = await world.streams.readFromStream("default"); Parameters:
| Parameter | Type | Description |
|---|---|---|
name | string | The stream name |
startIndex | number | Optional starting index for partial reads |
Returns: ReadableStream<Uint8Array>
closeStream()
Close a stream when done writing.
await world.streams.closeStream("default", runId); Parameters:
| Parameter | Type | Description |
|---|---|---|
name | string | The stream name |
runId | string | The workflow run ID |
listStreamsByRunId()
List all stream names associated with a workflow run.
const streamNames = await world.streams.listStreamsByRunId(runId); Parameters:
| Parameter | Type | Description |
|---|---|---|
runId | string | The workflow run ID |
Returns: string[] — Array of stream names
Examples
List All Streams for a Workflow Run
import { getWorld } from "workflow/runtime";
export async function GET(req: Request) {
const url = new URL(req.url);
const runId = url.searchParams.get("runId");
if (!runId) {
return Response.json({ error: "runId required" }, { status: 400 });
}
const world = getWorld();
const streamNames = await world.streams.listStreamsByRunId(runId);
return Response.json({ streams: streamNames });
}Read a Stream as a Response
import { getWorld } from "workflow/runtime";
export async function GET(req: Request) {
const url = new URL(req.url);
const streamName = url.searchParams.get("name");
if (!streamName) {
return Response.json({ error: "name required" }, { status: 400 });
}
const world = getWorld();
const readable = await world.streams.readFromStream(streamName);
return new Response(readable, {
headers: { "Content-Type": "application/octet-stream" },
});
}Related
- Streaming — Core concepts for streaming data from workflows
- getWritable() — The standard way to write to streams from within steps
- world.runs — List runs that may have associated streams