Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions guides/ai-transport/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Ably API key (required for all guides)
ABLY_API_KEY=

# OpenAI API key (required for openai-* guides)
OPENAI_API_KEY=

# Anthropic API key (required for anthropic-* and lang-graph-* guides)
ANTHROPIC_API_KEY=

# Vercel AI SDK uses the underlying provider's key (e.g. OPENAI_API_KEY)
2 changes: 2 additions & 0 deletions guides/ai-transport/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules/
.env
73 changes: 73 additions & 0 deletions guides/ai-transport/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# AI Transport Guides - E2E Code

Full, runnable implementations for each AI Transport guide. Each guide directory contains language-specific subdirectories with publisher (agent) and subscriber (client) code.

## Structure

```
<guide-name>/
javascript/ # JavaScript/TypeScript implementation
src/publisher.ts
src/subscriber.ts
test/e2e.test.ts
python/ # (future)
java/ # (future)
```

## Guides

| Guide | Provider | Pattern | Languages |
|-------|----------|---------|-----------|
| `openai-message-per-token` | OpenAI | Message per token | JavaScript |
| `openai-message-per-response` | OpenAI | Message per response | JavaScript |
| `anthropic-message-per-token` | Anthropic | Message per token | JavaScript |
| `anthropic-message-per-response` | Anthropic | Message per response | JavaScript |
| `vercel-message-per-token` | Vercel AI SDK | Message per token | JavaScript |
| `vercel-message-per-response` | Vercel AI SDK | Message per response | JavaScript |
| `lang-graph-message-per-token` | LangGraph | Message per token | JavaScript |
| `lang-graph-message-per-response` | LangGraph | Message per response | JavaScript |

## Streaming patterns

- **Message per token**: Publisher sends discrete `start`, `token`, and `stop` events. Subscriber reconstructs the full response by appending tokens correlated by `responseId`.
- **Message per response**: Publisher creates a single message and appends each token to it. Subscriber handles `message.create` and `message.append` actions, with the full response available in message history.

## Prerequisites

- Node.js 20+
- API keys for the relevant providers (see `.env.example`)

## Setup

```bash
cd guides/ai-transport
cp .env.example .env
# Fill in your API keys in .env

yarn install
```

## Running a guide

Each guide has a publisher (streams from LLM to Ably) and a subscriber (reads from Ably):

```bash
# Terminal 1 - start the subscriber
npx tsx <guide-name>/javascript/src/subscriber.ts

# Terminal 2 - start the publisher
npx tsx <guide-name>/javascript/src/publisher.ts
```

## Running tests

```bash
# All guides
yarn test

# Watch mode
yarn test:watch

# Single guide
npx vitest run <guide-name>
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "guide-anthropic-message-per-response",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"start:publisher": "tsx src/publisher.ts",
"start:subscriber": "tsx src/subscriber.ts",
"test": "vitest run"
},
"dependencies": {
"@anthropic-ai/sdk": "^0.71"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import Anthropic from '@anthropic-ai/sdk';
import Ably from 'ably';

export async function publish(channel: Ably.RealtimeChannel, prompt: string) {
const anthropic = new Anthropic();
let msgSerial: string | null = null;
let textBlockIndex: number | null = null;

const stream = await anthropic.messages.create({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }],
stream: true,
});

for await (const event of stream) {
switch (event.type) {
case 'message_start': {
const result = await channel.publish({ name: 'response', data: '' });
msgSerial = result.serials[0];
break;
}

case 'content_block_start':
if (event.content_block.type === 'text') {
textBlockIndex = event.index;
}
break;

case 'content_block_delta':
if (event.index === textBlockIndex && event.delta.type === 'text_delta' && msgSerial) {
channel.appendMessage({ serial: msgSerial, data: event.delta.text });
}
break;

case 'message_stop':
break;
}
}
}

async function main() {
const realtime = new Ably.Realtime({ key: process.env.ABLY_API_KEY, echoMessages: false });
const channel = realtime.channels.get('ai:anthropic-mpr-guide');
await publish(channel, 'Tell me a short joke');
console.log('Done streaming. Closing connection...');
realtime.close();
}

import { fileURLToPath } from 'url';
if (process.argv[1] === fileURLToPath(import.meta.url)) {
main().catch(console.error);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import Ably from 'ably';

// Subscribe to a channel and reconstruct streamed responses using message actions.
// Returns a promise that resolves with the full response text when no new appends
// arrive for 2 seconds (indicating the stream is complete).
export function subscribe(channel: Ably.RealtimeChannel): Promise<string> {
return new Promise((resolve) => {
const responses = new Map<string, string>();
let lastSerial: string | null = null;
let doneTimer: ReturnType<typeof setTimeout> | null = null;

const resetTimer = () => {
if (doneTimer) clearTimeout(doneTimer);
doneTimer = setTimeout(() => {
if (lastSerial) {
const finalText = responses.get(lastSerial) || '';
resolve(finalText);
}
}, 3000);
};

channel.subscribe((message: Ably.Message) => {
switch (message.action) {
case 'message.create':
console.log('\n[Response started]', message.serial);
responses.set(message.serial, message.data as string);
lastSerial = message.serial;
resetTimer();
break;

case 'message.append': {
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + (message.data as string));
process.stdout.write(message.data as string);
resetTimer();
break;
}

case 'message.update':
responses.set(message.serial, message.data as string);
console.log('\n[Response updated with full content]');
resetTimer();
break;
}
});
});
}

async function main() {
const realtime = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
const channel = realtime.channels.get('ai:anthropic-mpr-guide');
console.log('Subscriber ready, waiting for tokens...');
const response = await subscribe(channel);
console.log('\nFull response:', response);
realtime.close();
}

import { fileURLToPath } from 'url';
if (process.argv[1] === fileURLToPath(import.meta.url)) {
main().catch(console.error);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import Ably from 'ably';
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { publish } from '../src/publisher.js';
import { subscribe } from '../src/subscriber.js';

describe('anthropic-message-per-response', () => {
let publisherClient: Ably.Realtime;
let subscriberClient: Ably.Realtime;
let channelName: string;

beforeAll(() => {
channelName = `ai:test-anthropic-mpr-${Date.now()}`;
publisherClient = new Ably.Realtime({ key: process.env.ABLY_API_KEY, echoMessages: false });
subscriberClient = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
});

afterAll(async () => {
await new Promise((resolve) => setTimeout(resolve, 500));
publisherClient?.close();
subscriberClient?.close();
});

it('publishes initial message and appends tokens', async () => {
const channel = subscriberClient.channels.get(channelName + '-actions');
const pubChannel = publisherClient.channels.get(channelName + '-actions');
const actions: { action: string; data?: string; serial?: string }[] = [];

await channel.subscribe((message: Ably.Message) => {
actions.push({
action: message.action!,
data: message.data as string | undefined,
serial: message.serial,
});
});

await publish(pubChannel, 'Reply with exactly: OK');
await new Promise((resolve) => setTimeout(resolve, 3000));

expect(actions[0].action).toBe('message.create');
const appendActions = actions.filter((a) => a.action === 'message.append');
expect(appendActions.length).toBeGreaterThan(0);
// All actions should share the same serial
const serials = new Set(actions.map((a) => a.serial));
expect(serials.size).toBe(1);
});

it('subscriber reconstructs the full response from append actions', async () => {
const subChannel = subscriberClient.channels.get(channelName + '-reconstruct');
const pubChannel = publisherClient.channels.get(channelName + '-reconstruct');
const responsePromise = subscribe(subChannel);
await new Promise((resolve) => setTimeout(resolve, 1000));
await publish(pubChannel, 'Reply with exactly: Hello world');
const fullResponse = await responsePromise;
expect(fullResponse.length).toBeGreaterThan(0);
expect(fullResponse.toLowerCase()).toContain('hello');
});

it('appended tokens concatenate to match the full response', async () => {
const channel = subscriberClient.channels.get(channelName + '-concat');
const pubChannel = publisherClient.channels.get(channelName + '-concat');
const appendedTokens: string[] = [];

await channel.subscribe((message: Ably.Message) => {
if (message.action === 'message.append') {
appendedTokens.push(message.data as string);
}
});

const responsePromise = subscribe(channel);
await new Promise((resolve) => setTimeout(resolve, 1000));
await publish(pubChannel, 'Reply with exactly: Test');
const fullResponse = await responsePromise;
const concatenated = appendedTokens.join('');
expect(concatenated).toBe(fullResponse);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "../../tsconfig.base.json",
"include": ["src/**/*.ts", "test/**/*.ts"]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "guide-anthropic-message-per-token",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"start:publisher": "tsx src/publisher.ts",
"start:subscriber": "tsx src/subscriber.ts",
"test": "vitest run"
},
"dependencies": {
"@anthropic-ai/sdk": "^0.71"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Anthropic from '@anthropic-ai/sdk';
import Ably from 'ably';

async function processEvent(
event: Anthropic.MessageStreamEvent,
channel: Ably.RealtimeChannel,
state: { responseId: string | null },
) {
switch (event.type) {
case 'message_start':
state.responseId = event.message.id;
channel.publish({
name: 'start',
extras: { headers: { responseId: state.responseId } },
});
break;

case 'content_block_delta':
if (event.delta.type === 'text_delta') {
channel.publish({
name: 'token',
data: event.delta.text,
extras: { headers: { responseId: state.responseId } },
});
}
break;

case 'message_stop':
await channel.publish({
name: 'stop',
extras: { headers: { responseId: state.responseId } },
});
break;
}
}

export async function publish(channel: Ably.RealtimeChannel, prompt: string) {
const anthropic = new Anthropic();
const state = { responseId: null as string | null };

const stream = await anthropic.messages.create({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }],
stream: true,
});

for await (const event of stream) {
await processEvent(event, channel, state);
}
}

async function main() {
const realtime = new Ably.Realtime({ key: process.env.ABLY_API_KEY, echoMessages: false });
const channel = realtime.channels.get('anthropic-mpt-guide');
await publish(channel, 'Tell me a short joke');
console.log('Done streaming. Closing connection...');
realtime.close();
}

import { fileURLToPath } from 'url';
if (process.argv[1] === fileURLToPath(import.meta.url)) {
main().catch(console.error);
}
Loading