Opengram

SSE Event Streaming

Subscribe to a real-time stream of events from your Opengram instance using Server-Sent Events.

Opengram exposes a Server-Sent Events (SSE) endpoint that streams all activity in real time. Use this to build dashboards, sync external systems, or drive agent logic that reacts to user actions as they happen.

Connecting

Open a persistent HTTP connection to the SSE endpoint:

GET /api/v1/events/stream

Authentication

Pass your instance secret using either method:

  • Query parameter: ?token=your_instance_secret
  • Authorization header: Authorization: Bearer your_instance_secret

Query parameters

ParameterDefaultDescription
cursor--Event ID to resume from. Only events after this ID are delivered.
ephemeraltrueInclude non-persisted events (typing indicators, streaming chunks).
limit200Maximum events per batch. Cannot exceed 200.
token--Instance secret for authentication.

Event types

Chat events

EventEphemeralDescription
chat.createdNoA new chat was created.
chat.updatedNoChat metadata changed (title, tags, model).
chat.archivedNoA chat was archived.
chat.unarchivedNoA chat was unarchived.
chat.readNoA chat was marked as read.
chat.unreadNoA chat was marked as unread.
chat.typingYesAn agent is typing in a chat.
chat.user_typingYesA user is typing in a chat.

Message events

EventEphemeralDescription
message.createdNoA new message was persisted.
message.streaming.chunkYesA token chunk was appended to a streaming message.
message.streaming.completeNoA streaming message finished.

Request events

EventEphemeralDescription
request.createdNoAn interactive request was created.
request.resolvedNoA request was resolved by the user.
request.cancelledNoA request was cancelled.

Media events

EventEphemeralDescription
media.attachedNoA file was attached to a message.

Payload reference

Each event type includes a payload object with event-specific fields.

Chat event payloads

All chat events (chat.created, chat.updated, chat.archived, chat.unarchived, chat.read, chat.unread) share the same payload shape:

{ "chatId": "string" }

chat.typing adds an agentId:

{ "chatId": "string", "agentId": "string" }

chat.user_typing adds a senderId:

{ "chatId": "string", "senderId": "string" }

Message event payloads

message.created

{
  "chatId": "string",
  "messageId": "string",
  "role": "user | agent | system",
  "senderId": "string",
  "streamState": "complete | streaming",
  "contentFinal": "string | null",
  "createdAt": "string (ISO 8601)",
  "trace": "object | null (optional)"
}

message.streaming.chunk

{
  "chatId": "string",
  "messageId": "string",
  "deltaText": "string"
}

message.streaming.complete

{
  "chatId": "string",
  "messageId": "string",
  "role": "string",
  "streamState": "complete | cancelled",
  "finalText": "string | null"
}

Request event payloads

request.created

{
  "chatId": "string",
  "requestId": "string",
  "type": "string",
  "title": "string"
}

request.resolved / request.cancelled

{
  "chatId": "string",
  "requestId": "string",
  "type": "string",
  "status": "resolved | cancelled",
  "resolution_payload": "object | null",
  "trace": "object | null"
}

Media event payloads

media.attached

{
  "chatId": "string",
  "mediaId": "string",
  "messageId": "string | null",
  "kind": "string"
}

Event envelope

Each SSE event is delivered as a JSON envelope:

{
  "id": "xK9mQ2vLpR7wYbNcT3hZa",
  "type": "message.created",
  "timestamp": "2024-03-01T18:00:00.000Z",
  "payload": {
    "chatId": "chat_456",
    "messageId": "msg_789",
    "role": "user",
    "senderId": "user:primary",
    "streamState": "complete",
    "contentFinal": "Hello!",
    "createdAt": "2024-03-01T18:00:00.000Z"
  }
}

The SSE wire format uses the id field for cursor tracking and event for the type:

id: xK9mQ2vLpR7wYbNcT3hZa
event: message.created
data: {"id":"xK9mQ2vLpR7wYbNcT3hZa","type":"message.created","timestamp":"2024-03-01T18:00:00.000Z","payload":{...}}

JavaScript client example

const url = "https://your-instance.com/api/v1/events/stream?token=og_secret";
const eventSource = new EventSource(url);

eventSource.addEventListener("message.created", (e) => {
  const event = JSON.parse(e.data);
  console.log("New message:", event.payload);
});

eventSource.addEventListener("chat.typing", (e) => {
  const event = JSON.parse(e.data);
  console.log("Agent typing in:", event.payload.chatId);
});

eventSource.onerror = () => {
  console.log("Connection lost, EventSource will auto-reconnect");
};

Ephemeral events

Ephemeral events (typing indicators, streaming chunks) are not stored in the database. They are only delivered to clients connected at the time they occur. After a disconnect, ephemeral events cannot be replayed -- only persisted events are replayable via cursor.

Stream timeout

The SSE connection respects server.streamTimeoutSeconds (default: 60). This controls how long streaming messages can remain in the streaming state before being auto-cancelled -- it does not affect the SSE connection itself, which stays open indefinitely.

Cursor-based replay

Every non-ephemeral event includes a unique ID. To resume after a disconnect, pass the last event ID you received as the cursor query parameter. Opengram will replay all events that occurred after that cursor so you do not miss anything.

GET /api/v1/events/stream?cursor=xK9mQ2vLpR7wYbNcT3hZa&token=your_secret

If the cursor ID is not found in the database (e.g. after a database reset), the server returns a validation error rather than silently starting from the beginning.

During replay, new live events are queued (up to 512). If your client is too far behind and the queue overflows, the server will close the connection -- your client should reconnect without a cursor or with a more recent one.

Ephemeral events (typing indicators, streaming chunks) are not replayable -- they are only delivered to clients connected at the time they occur.

Keep-alive

The server sends a comment line (: keepalive) every 15 seconds to prevent proxies and load balancers from closing idle connections.

Response headers

The SSE endpoint returns these headers:

HeaderValuePurpose
Content-Typetext/event-stream; charset=utf-8Identifies the response as an SSE stream.
X-Accel-BufferingnoTells Nginx (and similar proxies) not to buffer the response.
Cache-Controlno-cache, no-transformPrevents caching and transformation of the event stream.

On this page