> ## Documentation Index
> Fetch the complete documentation index at: https://docs.grantex.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Event Streaming

> Stream Grantex authorization events in real time via SSE or WebSocket, and forward them to external systems with @grantex/destinations.

Grantex provides real-time event streaming endpoints that push authorization lifecycle events as they happen. Combined with the `@grantex/destinations` package, you can forward events to SIEMs, data warehouses, and message brokers without writing custom plumbing.

## Architecture

```
┌─────────────────┐     SSE / WebSocket     ┌──────────────────┐
│  Grantex Auth    │ ─────────────────────► │  EventSource      │
│  Service         │                         │  (@grantex/       │
│                  │                         │   destinations)   │
└─────────────────┘                         └────────┬─────────┘
                                                     │
                                          ┌──────────┼──────────┐
                                          │          │          │
                                          ▼          ▼          ▼
                                       Datadog    Splunk      S3
                                       Splunk     BigQuery    Kafka
```

The `EventSource` class connects to the SSE stream and dispatches each event to one or more **destinations** you configure. Destinations buffer events and flush them in batches for efficiency.

## Event Types

| Event              | When it fires                                                   |
| ------------------ | --------------------------------------------------------------- |
| `grant.created`    | A new grant is issued after the user completes the consent flow |
| `grant.revoked`    | A grant is revoked (root or cascade revocation)                 |
| `token.issued`     | A grant token is issued (initial exchange or refresh)           |
| `budget.threshold` | A budget usage threshold is crossed (e.g., 80%)                 |
| `budget.exhausted` | A budget is fully consumed                                      |

Each event has the same envelope:

```json theme={null}
{
  "id": "evt_01JXYZ...",
  "type": "grant.created",
  "createdAt": "2026-03-01T12:00:00Z",
  "data": {
    "grantId": "grnt_01...",
    "agentId": "ag_01...",
    "principalId": "user-123",
    "scopes": ["calendar:read"]
  }
}
```

## SSE Endpoint

**`GET /v1/events/stream`** returns a Server-Sent Events stream. Authenticate with your API key as a Bearer token.

### curl

```bash theme={null}
curl -N -H "Authorization: Bearer $GRANTEX_API_KEY" \
  "https://api.grantex.dev/v1/events/stream"
```

Filter by event type with the `types` query parameter (comma-separated):

```bash theme={null}
curl -N -H "Authorization: Bearer $GRANTEX_API_KEY" \
  "https://api.grantex.dev/v1/events/stream?types=grant.created,grant.revoked"
```

### TypeScript

```typescript theme={null}
const res = await fetch('https://api.grantex.dev/v1/events/stream', {
  headers: { Authorization: `Bearer ${process.env.GRANTEX_API_KEY}` },
});

const reader = res.body!.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value, { stream: true });
  for (const line of text.split('\n')) {
    if (line.startsWith('data: ')) {
      const event = JSON.parse(line.slice(6));
      console.log(event.type, event.data);
    }
  }
}
```

### Python

```python theme={null}
import httpx
import json
import os

with httpx.stream(
    "GET",
    "https://api.grantex.dev/v1/events/stream",
    headers={"Authorization": f"Bearer {os.environ['GRANTEX_API_KEY']}"},
) as response:
    for line in response.iter_lines():
        if line.startswith("data: "):
            event = json.loads(line[6:])
            print(event["type"], event["data"])
```

## WebSocket Endpoint

**`GET /v1/events/ws`** upgrades to a WebSocket connection. Each message is a JSON-encoded event.

### TypeScript

```typescript theme={null}
const ws = new WebSocket('wss://api.grantex.dev/v1/events/ws', {
  headers: { Authorization: `Bearer ${process.env.GRANTEX_API_KEY}` },
});

ws.on('message', (raw) => {
  const event = JSON.parse(raw.toString());
  console.log(event.type, event.data);
});
```

### Python

```python theme={null}
import asyncio
import json
import os
import websockets

async def listen():
    uri = "wss://api.grantex.dev/v1/events/ws"
    headers = {"Authorization": f"Bearer {os.environ['GRANTEX_API_KEY']}"}

    async with websockets.connect(uri, additional_headers=headers) as ws:
        async for message in ws:
            event = json.loads(message)
            print(event["type"], event["data"])

asyncio.run(listen())
```

## @grantex/destinations Package

The `@grantex/destinations` package provides a high-level `EventSource` class that connects to the SSE stream and dispatches events to one or more destinations. Install it with:

```bash theme={null}
npm install @grantex/destinations
```

### Basic Usage

```typescript theme={null}
import { EventSource, DatadogDestination, SplunkDestination } from '@grantex/destinations';

const source = new EventSource({
  url: 'https://api.grantex.dev',
  apiKey: process.env.GRANTEX_API_KEY!,
  types: ['grant.created', 'grant.revoked', 'token.issued'],
});

// Add one or more destinations
source.addDestination(new DatadogDestination({
  apiKey: process.env.DD_API_KEY!,
}));

source.addDestination(new SplunkDestination({
  hecUrl: 'https://splunk.example.com:8088',
  hecToken: process.env.SPLUNK_HEC_TOKEN!,
}));

// Start streaming — runs until you call stop()
await source.start();
```

### Available Destinations

| Destination | Class                 | Target                      |
| ----------- | --------------------- | --------------------------- |
| Datadog     | `DatadogDestination`  | Datadog Logs API            |
| Splunk      | `SplunkDestination`   | Splunk HTTP Event Collector |
| Amazon S3   | `S3Destination`       | S3 bucket (NDJSON files)    |
| BigQuery    | `BigQueryDestination` | Google BigQuery table       |
| Kafka       | `KafkaDestination`    | Apache Kafka topic          |

### Shared Configuration

All destinations accept these base options:

| Option            | Type     | Default | Description                                               |
| ----------------- | -------- | ------- | --------------------------------------------------------- |
| `batchSize`       | `number` | Varies  | Number of events to buffer before flushing                |
| `flushIntervalMs` | `number` | —       | Flush on a timer (milliseconds) even if batch is not full |

### Graceful Shutdown

Always call `stop()` to flush pending events and close connections:

```typescript theme={null}
process.on('SIGTERM', async () => {
  await source.stop();
  process.exit(0);
});
```

### Custom Destinations

Implement the `EventDestination` interface to build your own:

```typescript theme={null}
import type { EventDestination, GrantexEvent } from '@grantex/destinations';

class MyDestination implements EventDestination {
  readonly name = 'my-destination';

  async send(events: GrantexEvent[]): Promise<void> {
    for (const event of events) {
      // Forward to your system
    }
  }

  async flush(): Promise<void> {
    // Flush any buffered events
  }

  async close(): Promise<void> {
    // Clean up resources
  }
}
```

## Next Steps

* [Datadog Integration](/guides/siem-datadog) — monitors and alerting
* [Splunk Integration](/guides/siem-splunk) — saved searches and dashboards
* [S3 & BigQuery Archival](/guides/siem-s3-bigquery) — compliance archival
* [Webhooks](/guides/webhooks) — push-based event delivery to your own endpoints
* [Metrics & Observability](/guides/metrics-observability) — Prometheus metrics and Grafana dashboards
