---
url: /blog/posts/2026-03-26-stream-db.md
description: >-
  StreamDB turns a Durable Stream into a reactive database. You provide a
  Standard Schema, StreamDB gives you type-safe, durable state with sync.
---

[StreamDB](https://durable-streams.com/stream-db) turns a [Durable Stream](/primitives/durable-streams) into a reactive database.

You provide a [Standard Schema](https://standardschema.dev) and get type‑safe, multiplexed durable state with sync for your AI apps and agentic systems.

> \[!Warning] ✨  Docs, demo and skills
> [Read the docs](https://durable-streams.com/stream-db) see the [example app](https://github.com/durable-streams/durable-streams/tree/main/examples/stream-db) and [install skills](#let-your-agent-do-it) for your coding agent.

## Wiring up agent state

Agent sessions need state. Tool calls, messages, active generations. User presence, agent registration. This state should be real-time and reactive as well as persistent and forkable. In an ideal world [it should be unified](https://github.com/humanlayer/12-factor-agents/blob/main/content/factor-05-unify-execution-state.md) and you should just be able to declare a schema and have it all just work. Typed, synced and reactive.

In practice you end up wiring together a database, a pub/sub layer, custom sync logic, and retry handling. Every project is a bespoke stack. Coordination is hand-rolled. Type-safety is missing. State breaks on disconnect. Collaboration is an afterthought.

### Enter StreamDB

StreamDB wraps a Durable Stream with a [Standard Schema](https://standardschema.dev) to give you typed, reactive collections. You define your entity types — messages, presence, agents, whatever your session needs — as Standard Schema objects.

StreamDB routes events by type into [TanStack DB](/primitives/tanstack-db) collections, with end-to-end type-safety from producer to consumer.

Multiplexing is built in. Multiple entity types coexist on a single stream, each routed to its own collection. The stream is the source of truth. Collections materialize state by applying events in order. Clients join from any offset and catch up.

### Reactive by default

The data comes into [TanStack DB collections](https://tanstack.com/db/latest/docs/overview#collections). Materialized using a differential dataflow based live query engine. When a new event arrives on the stream, only the affected data recalculates, giving you sub-millisecond reactivity.

Derived collections compose. You can filter messages, aggregate presence and [join up](https://tanstack.com/db/latest/docs/guides/live-queries#joins) your session data with your existing app data.

### Durable by default

The underlying [Durable Stream](/primitives/durable-streams) is persistent and addressable. State survives disconnects, refreshes, restarts. Clients resume from their last offset. There's no re-fetching, no state loss.

Everything is multi-user, multi-tab, multi-device and multi-agent out of the box. All reading and writing to the same stream.

## Using StreamDB

### Define your schema

Start with Standard Schema objects for each entity type in your session.
Combine them into a unified schema with `createStateSchema` — this
maps each type to a collection with a primary key.

```ts
import { z } from 'zod'
import { createStateSchema } from '@durable-streams/state'

const schema = createStateSchema({
  messages: {
    schema: z.object({
      id: z.string(),
      role: z.enum(['user', 'assistant', 'system']),
      content: z.string(),
      createdAt: z.string(),
    }),
    type: 'message',
    primaryKey: 'id',
  },
  presence: {
    schema: z.object({
      userId: z.string(),
      status: z.enum(['online', 'offline']),
    }),
    type: 'presence',
    primaryKey: 'userId',
  },
  agents: {
    schema: z.object({
      agentId: z.string(),
      name: z.string(),
      endpoint: z.string(),
    }),
    type: 'agent',
    primaryKey: 'agentId',
  },
})
```

Three entity types, one schema, one stream. Messages, presence, and agent
registration multiplexed together with full type-safety.

### Connect to a stream

`createStreamDB` connects your schema to a Durable Stream and gives
you a reactive, stream-backed database.

```ts
import { createStreamDB } from '@durable-streams/state'

const db = createStreamDB({
  streamOptions: {
    url: 'https://api.electric-sql.cloud/v1/streams/my-session',
    contentType: 'application/json',
  },
  state: schema,
})

await db.preload()
```

`preload()` reads from the beginning of the stream, materializes current
state, and stays connected for live updates. From this point,
`db.collections.messages`, `db.collections.presence`, and
`db.collections.agents` are live TanStack DB collections.

### Query reactively

Bind collections to your components with `useLiveQuery` — standard
TanStack DB.

```tsx
import { useLiveQuery } from '@tanstack/react-db'

function MessageList() {
  const { data: messages } = useLiveQuery((q) =>
    q
      .from({ msg: db.collections.messages })
      .orderBy(({ msg }) => msg.createdAt, 'asc')
  )

  return <List items={messages} />
}
```

Queries update incrementally via differential dataflow — no
re-scanning, no re-rendering unaffected rows. Derived views compose
naturally: filter, join, aggregate across collections.

```tsx
function OnlineAgents() {
  const { data: agents } = useLiveQuery((q) =>
    q.from({ agent: db.collections.agents })
  )

  return <AgentList items={agents} />
}
```

### Derive collections

Raw stream data often needs materializing into higher-level structures
— token chunks into complete messages, for example. Derived
collections let you do this declaratively, using live query pipelines
that update incrementally.

```tsx
import { createLiveQueryCollection, collect, count, minStr } from '@tanstack/db'

const messagesCollection = createLiveQueryCollection({
  query: (q) => {
    // Group chunks by messageId, collecting the raw rows
    const collected = q
      .from({ chunk: db.collections.chunks })
      .groupBy(({ chunk }) => chunk.messageId)
      .select(({ chunk }) => ({
        messageId: chunk.messageId,
        rows: collect(chunk),
        startedAt: minStr(chunk.createdAt),
        rowCount: count(chunk),
      }))

    // Materialize grouped chunks into messages
    return q
      .from({ collected })
      .orderBy(({ collected }) => collected.startedAt, 'asc')
      .fn.select(({ collected }) => materializeMessage(collected.rows))
  },
  getKey: (row) => row.id,
})
```

No for loops over client data. Differential dataflow means only changed
chunks trigger recalculation. Derived collections are themselves
TanStack DB collections. You can query, filter, and derive
further from them.

```tsx
// Derive pending tool call approvals from the materialized messages
const approvalsCollection = createLiveQueryCollection({
  query: (q) =>
    q
      .from({ message: messagesCollection })
      .fn.where(({ message }) =>
        message.parts.some(
          (p) => p.type === 'tool-call' &&
            p.approval?.needsApproval === true &&
            p.approval.approved === undefined
        )
      ),
  getKey: (row) => row.id,
})
```

Chunks sync on the stream. Messages materialize from chunks. Approvals
derive from messages. Each layer is reactive, typed, and incremental.

### Write with optimistic mutations

Writes go through TanStack DB's optimistic action system. Local state
updates instantly, persistence happens async.

```ts
const db = createStreamDB({
  streamOptions: { url: streamUrl, contentType: 'application/json' },
  state: schema,
  actions: ({ db, stream }) => ({
    sendMessage: {
      onMutate: (msg) => {
        db.collections.messages.insert(msg)
      },
      mutationFn: async (msg) => {
        const txid = crypto.randomUUID()
        const event = schema.messages.insert({ value: msg, headers: { txid } })
        await stream.append(JSON.stringify(event))
        await db.utils.awaitTxId(txid)
      },
    },
  }),
})

await db.actions.sendMessage({
  id: crypto.randomUUID(),
  role: 'user',
  content: 'Hello',
  createdAt: new Date().toISOString(),
})
```

`onMutate` inserts into the local collection immediately — the UI
updates before the network round-trip. `mutationFn` appends to the
Durable Stream with a transaction ID, then waits for it to sync back.
If the write fails, TanStack DB rolls back the optimistic state
automatically.

## Getting started

See the [StreamDB docs](https://durable-streams.com/stream-db) and the [stream-db example app](https://github.com/durable-streams/durable-streams/tree/main/).

### Let your agent do it

Our packages also ship with agent skills via [TanStack Intent](https://tanstack.com/intent/registry). Install the Intent system:

```bash
npx @tanstack/intent install
```

Intent finds the skills that are versioned with your dependency packages. `npm update` updates the skills too. Your coding agent can then provision a Durable Stream on Electric Cloud and scaffold a StreamDB in one shot.

Ask it to "create a StreamDB for my agentic session". Your agent will provision the stream, generates a schema from your requirements and wire everything up for you.

Software factories can do this programmatically. Spin up session state as
part of an agent workflow with no no manual infrastructure needed.

### What's underneath

StreamDB runs on [Durable Streams](https://durable-streams.com), an open protocol for persistent, addressable, real-time streams designed to host agentic session data.

Building the [Durable Sessions](/blog/2026/01/12/durable-sessions-for-collaborative-ai) demo on Durable Streams is where this clicked for me. StreamDB eliminated the boilerplate and derived collections allowed me to materialize token chunks into the message state. All incremental, all reactive, like living in the future.

### Next steps

* [StreamDB docs](https://durable-streams.com/stream-db)
* [Deploy on Electric Cloud](https://dashboard.electric-sql.cloud/?intent=create\&serviceType=streams\&serviceVariant=state)
* [Join our Discord](https://discord.electric-sql.com)
