> ## Documentation Index
> Fetch the complete documentation index at: https://microstrate-1133-notifications-prefs.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Stream Functions

> Event sourcing and message streaming with subject-based routing

# Stream Functions

Stream functions provide persistent, ordered message storage with event sourcing capabilities. Use streams to build audit logs, maintain event-driven state, and implement replay-based systems with subject-based message routing.

<Info>
  **What are Streams?** Streams are persistent logs that listen to subject patterns and store messages in order. Messages are identified by subjects and can be retrieved by sequence number. Streams support aggregation through folding, where messages are reduced to build current state.
</Info>

***

## Function List

<CardGroup cols={2}>
  <Card title="stream-create" icon="plus" href="#stream-create">
    Create a new stream
  </Card>

  <Card title="stream-publish" icon="paper-plane" href="#stream-publish">
    Publish message to subject
  </Card>

  <Card title="stream-get" icon="download" href="#stream-get">
    Retrieve messages by subject
  </Card>

  <Card title="stream-search" icon="magnifying-glass" href="#stream-search">
    Search messages in stream
  </Card>

  <Card title="stream-aggregate" icon="chart-line" href="#stream-aggregate">
    Fold messages into current state
  </Card>

  <Card title="stream-list" icon="list" href="#stream-list">
    List all streams
  </Card>

  <Card title="stream-unset" icon="eraser" href="#stream-unset">
    Remove properties from aggregate
  </Card>

  <Card title="stream-tombstone" icon="tombstone" href="#stream-tombstone">
    Mark subject as deleted
  </Card>

  <Card title="stream-poison-pill" icon="skull" href="#stream-poison-pill">
    Reset aggregate state
  </Card>
</CardGroup>

***

## stream-create

Create a new stream that listens to specified subject patterns and stores messages persistently.

### Parameters

<ParamField path="name" type="string" required>
  Name of the stream. Must be unique and contain only alphanumeric characters, dashes, and underscores.

  **Naming conventions:**

  * Use descriptive names: `order-events`, `user-activity`
  * Indicate purpose: `audit-log`, `state-changes`
</ParamField>

<ParamField path="subject" type="array" required>
  Array of subject patterns the stream listens to. Supports wildcards (`*` for single token, `>` for multiple tokens).

  **Examples:**

  * `["orders.>"]` - All order-related subjects
  * `["users.*.created", "users.*.updated"]` - Specific user events
  * `["events.production.>"]` - All production events
</ParamField>

<ParamField path="description" type="string">
  Optional description of the stream's purpose.
</ParamField>

<ParamField path="storage" type="string" default="File">
  Storage backend type.

  **Options:**

  * `File` - Persistent file storage (recommended)
  * `Memory` - In-memory storage (faster, not persistent)
</ParamField>

<ParamField path="retention" type="string" default="Limits">
  Message retention policy.

  **Options:**

  * `Limits` - Retain until configured limits reached (default)
  * `Interest` - Retain while consumers are interested
  * `WorkQueue` - Remove messages after acknowledgment
</ParamField>

<ParamField path="max_msgs" type="integer" default="-1">
  Maximum number of messages to store. `-1` for unlimited.
</ParamField>

<ParamField path="max_bytes" type="integer" default="-1">
  Maximum total size in bytes. `-1` for unlimited.
</ParamField>

<ParamField path="max_age" type="integer">
  Maximum age of messages in nanoseconds. Older messages are automatically deleted.
</ParamField>

<ParamField path="max_msgs_per_subject" type="integer">
  Maximum messages per subject. Useful for keeping only recent events per entity.
</ParamField>

<ParamField path="discard" type="string" default="Old">
  Policy when stream reaches limits.

  **Options:**

  * `Old` - Discard oldest messages (default)
  * `New` - Reject new messages
</ParamField>

<ParamField path="num_replicas" type="integer" default="1">
  Number of stream replicas for high availability (1-5).
</ParamField>

<ParamField path="duplicate_window" type="integer">
  Window in nanoseconds to detect duplicate messages. Default is 2 minutes.
</ParamField>

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "message": "Stream created successfully"
  }
}
```

### Example Usage

<CodeGroup>
  ```json Basic Stream theme={null}
  {
    "name": "order-events",
    "subject": ["orders.>"],
    "description": "All order-related events",
    "max_msgs": 1000000,
    "max_age": 2592000000000000
  }
  ```

  ```json High-Volume Stream theme={null}
  {
    "name": "analytics-events",
    "subject": ["analytics.pageview", "analytics.click"],
    "storage": "File",
    "retention": "Limits",
    "max_bytes": 10737418240,
    "discard": "Old",
    "num_replicas": 3
  }
  ```
</CodeGroup>

***

## stream-publish

Publish a message to a subject. If a stream is configured to listen to this subject pattern, it will store the message.

### Parameters

<ParamField path="subject" type="string" required>
  Subject to publish the message to. This determines which streams receive the message.

  **Subject patterns:**

  * `orders.123.created` - Specific order event
  * `users.456.updated` - User update event
  * `events.production.error` - Production error event
</ParamField>

<ParamField path="value" type="object | string" required>
  Message payload. Can be a JSON object or string.

  **Best practices:**

  * Include event type: `{"event": "order_created"}`
  * Include timestamp: `{"timestamp": "2025-10-16T10:30:00Z"}`
  * Include relevant IDs: `{"order_id": "123", "user_id": "456"}`
</ParamField>

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "message": "Message published successfully"
  }
}
```

### Example Usage

<CodeGroup>
  ```json Order Event theme={null}
  {
    "subject": "orders.ORD-001.created",
    "value": {
      "event": "order_created",
      "order_id": "ORD-001",
      "customer_id": "CUST-123",
      "amount": 150.00,
      "items": 3,
      "timestamp": "2025-10-16T10:30:00Z"
    }
  }
  ```

  ```json User Activity theme={null}
  {
    "subject": "users.user-456.activity",
    "value": {
      "event": "page_view",
      "user_id": "user-456",
      "page": "/products/widget",
      "session_id": "sess_abc123",
      "timestamp": "2025-10-16T10:30:00Z"
    }
  }
  ```
</CodeGroup>

### Common Patterns

<AccordionGroup>
  <Accordion title="Entity State Changes" icon="rotate">
    Track all changes to an entity

    ```json theme={null}
    Subject: "orders.{order_id}.{event}"
    Examples:
      - orders.ORD-001.created
      - orders.ORD-001.paid
      - orders.ORD-001.shipped
      - orders.ORD-001.delivered
    ```
  </Accordion>

  <Accordion title="User Activity Tracking" icon="user">
    Track user actions

    ```json theme={null}
    Subject: "users.{user_id}.activity"
    Value: {
      "event": "action_name",
      "details": {...}
    }
    ```
  </Accordion>
</AccordionGroup>

***

## stream-get

Retrieve messages from a stream by subject. Can retrieve all messages for a subject or a specific sequence range.

### Parameters

<ParamField path="stream" type="string" required>
  Name of the stream to retrieve messages from.
</ParamField>

<ParamField path="subject" type="string" required>
  Subject to retrieve messages for. Must match messages exactly (no wildcards).
</ParamField>

<ParamField path="from_sequence" type="integer">
  Start from this sequence number (inclusive). If provided, enables sequence-based retrieval.
</ParamField>

<ParamField path="to_sequence" type="integer">
  End at this sequence number (inclusive). Requires `from_sequence`.
</ParamField>

<ParamField path="limit" type="integer">
  Maximum number of messages to return. Requires `from_sequence`.
</ParamField>

<Info>
  If `from_sequence`, `limit`, or `to_sequence` are provided, the function uses sequence-based retrieval. Otherwise, it retrieves all messages for the subject.
</Info>

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "body": {
      "results_total": 3,
      "results": [
        {
          "created": 1697458200000,
          "subject": "orders.ORD-001.created",
          "value": "{\"event\":\"order_created\",\"amount\":150}"
        }
      ]
    },
    "metadata": {
      "stream": "order-events",
      "subject": "orders.ORD-001.created"
    }
  }
}
```

### Example Usage

<CodeGroup>
  ```json Get All Messages theme={null}
  {
    "stream": "order-events",
    "subject": "orders.ORD-001.created"
  }
  ```

  ```json Get Sequence Range theme={null}
  {
    "stream": "order-events",
    "subject": "orders.ORD-001.created",
    "from_sequence": 100,
    "to_sequence": 200
  }
  ```

  ```json Get Recent Messages theme={null}
  {
    "stream": "order-events",
    "subject": "orders.ORD-001.created",
    "from_sequence": 500,
    "limit": 50
  }
  ```
</CodeGroup>

***

## stream-search

Search for messages within a stream using a subject pattern and return matching results.

### Parameters

<ParamField path="stream" type="string" required>
  Name of the stream to search.
</ParamField>

<ParamField path="search" type="string" required>
  Subject pattern to search for. Supports wildcards.

  **Examples:**

  * `orders.*` - All direct order subjects
  * `orders.>` - All order subjects (including nested)
  * `users.123.*` - All events for user 123
</ParamField>

<ParamField path="limit" type="integer" required>
  Maximum number of results to return (1-1000).
</ParamField>

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "body": {
      "results_total": 45,
      "results": [
        {
          "created": 1697458200000,
          "subject": "orders.ORD-001.created",
          "value": "{\"event\":\"order_created\"}"
        },
        {
          "created": 1697458260000,
          "subject": "orders.ORD-001.paid",
          "value": "{\"event\":\"order_paid\"}"
        }
      ]
    },
    "metadata": {
      "stream": "order-events",
      "subject": "orders.>"
    }
  }
}
```

### Example Usage

<CodeGroup>
  ```json Search Order Events theme={null}
  {
    "stream": "order-events",
    "search": "orders.ORD-001.>",
    "limit": 100
  }
  ```

  ```json Search User Activity theme={null}
  {
    "stream": "analytics-events",
    "search": "users.user-456.*",
    "limit": 50
  }
  ```
</CodeGroup>

***

## stream-aggregate

Fold messages for a subject into current state using event sourcing. Messages are processed in order and reduced using lodash merge, with special control messages for state manipulation.

<Info>
  **Event Sourcing with Folding:** Aggregate rebuilds current state by replaying all messages for a subject in order. Normal messages are merged, while control messages (unset, tombstone, poison-pill) modify the fold behavior.
</Info>

### Parameters

<ParamField path="stream" type="string" required>
  Name of the stream to aggregate from.
</ParamField>

<ParamField path="subject" type="string" required>
  Subject pattern to search and aggregate. Supports wildcards.
</ParamField>

### Folding Logic

The aggregate function processes messages in order with this logic:

1. **Normal messages**: Merged into aggregate using lodash `merge()`
2. **`type: 'unset'`**: Removes specified paths from aggregate
3. **`type: 'poison-pilled'`**: Resets aggregate to empty object `{}`
4. **`type: 'tombstoned'`**: Stops processing (ignores subsequent messages)

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "body": {
      "order_id": "ORD-001",
      "customer_id": "CUST-123",
      "amount": 150.00,
      "status": "shipped",
      "tracking": "1Z999AA"
    },
    "metadata": {
      "stream": "order-events",
      "subject": "orders.ORD-001.>"
    }
  }
}
```

### Example Usage

<CodeGroup>
  ```json Aggregate Order State theme={null}
  {
    "stream": "order-events",
    "subject": "orders.ORD-001.>"
  }
  ```

  ```json Aggregate User Profile theme={null}
  {
    "stream": "user-events",
    "subject": "users.user-456.>"
  }
  ```
</CodeGroup>

### Folding Example

Given these messages in order:

```json theme={null}
Message 1: {"order_id": "ORD-001", "status": "created", "amount": 150}
Message 2: {"status": "paid", "payment_id": "PAY-123"}
Message 3: {"status": "shipped", "tracking": "1Z999AA"}
Message 4: {"type": "unset", "path": "payment_id"}
```

Result after folding:

```json theme={null}
{
  "order_id": "ORD-001",
  "status": "shipped",
  "amount": 150,
  "tracking": "1Z999AA"
  // payment_id removed by unset
}
```

***

## stream-list

List all streams in your account with their metadata and configuration.

### Parameters

No parameters required.

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "body": {
      "results_total": 3,
      "results": [
        {
          "created": 1694170800000,
          "description": "Order lifecycle events",
          "messages_total": 12458,
          "metadata": {},
          "name": "order-events",
          "subjects": ["orders.>"]
        },
        {
          "created": 1694257200000,
          "description": "User activity tracking",
          "messages_total": 98234,
          "metadata": {},
          "name": "user-activity",
          "subjects": ["users.*.activity"]
        }
      ]
    }
  }
}
```

### Example Usage

```json theme={null}
{
  "function": "stream-list"
}
```

***

## stream-unset

Publish an unset control message that removes specified properties from the aggregate when folded.

<Info>
  **Use Case:** Remove sensitive data, correct mistakes, or clean up deprecated fields from the current state without affecting message history.
</Info>

### Parameters

<ParamField path="subject" type="string" required>
  Subject to publish the unset message to. Must match the subject used in aggregation.
</ParamField>

<ParamField path="path" type="string" required>
  Property path to remove from aggregate. Supports dot notation for nested properties.

  **Examples:**

  * `"email"` - Remove top-level property
  * `"address.street"` - Remove nested property
  * `"metadata.temporary"` - Remove from nested object
</ParamField>

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "message": "Unset message published successfully"
  }
}
```

### Example Usage

<CodeGroup>
  ```json Remove Property theme={null}
  {
    "subject": "users.user-456.state",
    "path": "temporary_token"
  }
  ```

  ```json Remove Nested Property theme={null}
  {
    "subject": "orders.ORD-001.state",
    "path": "payment.card_number"
  }
  ```
</CodeGroup>

### How It Works

```text theme={null}
Initial aggregate: {"name": "John", "email": "john@example.com", "temp": "data"}
↓
Publish unset: {"subject": "users.123.state", "path": "temp"}
↓
After aggregation: {"name": "John", "email": "john@example.com"}
```

***

## stream-tombstone

Publish a tombstone control message that stops processing further messages when encountered during aggregation. Use to mark a subject as deleted while preserving history.

<Info>
  **Tombstone Pattern:** Marks an entity as deleted without removing history. Aggregation stops at the tombstone, ignoring all subsequent messages.
</Info>

### Parameters

<ParamField path="subject" type="string" required>
  Subject to publish the tombstone message to. Future aggregations will stop at this message.
</ParamField>

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "message": "Tombstone message published successfully"
  }
}
```

### Example Usage

```json theme={null}
{
  "subject": "users.user-789.state"
}
```

### How It Works

```text theme={null}
Message 1: {"name": "Alice", "status": "active"}
Message 2: {"email": "alice@example.com"}
Message 3: {"type": "tombstoned"}
Message 4: {"status": "reactivated"}  ← This is ignored
↓
Aggregate stops at tombstone: {"name": "Alice", "email": "alice@example.com"}
```

***

## stream-poison-pill

Publish a poison pill control message that resets the aggregate to an empty object when encountered during folding. Use to start fresh or correct corrupted state.

<Info>
  **Reset Pattern:** Clears all previous state and starts fresh from this point. Useful for major state corrections or entity resets.
</Info>

### Parameters

<ParamField path="subject" type="string" required>
  Subject to publish the poison pill message to. Aggregate will reset to `{}` at this message.
</ParamField>

### Response

```json theme={null}
{
  "status_code": 200,
  "body": {
    "message": "Poison pill message published successfully"
  }
}
```

### Example Usage

```json theme={null}
{
  "subject": "orders.ORD-001.state"
}
```

### How It Works

```text theme={null}
Message 1: {"order_id": "ORD-001", "status": "created", "amount": 150}
Message 2: {"status": "paid"}
Message 3: {"type": "poison-pilled"}  ← Resets to {}
Message 4: {"order_id": "ORD-001", "status": "created", "amount": 200}
Message 5: {"status": "paid"}
↓
Final aggregate: {"order_id": "ORD-001", "status": "paid", "amount": 200}
```

***

## Best Practices

<CardGroup cols={2}>
  <Card title="Use Meaningful Subjects" icon="tag">
    Structure subjects hierarchically: `entity.id.event` (e.g., `orders.123.created`)
  </Card>

  <Card title="Include Event Types" icon="file-lines">
    Always include event type in value: `{"event": "order_created"}`
  </Card>

  <Card title="Add Timestamps" icon="clock">
    Include timestamps in message payloads for debugging and analytics
  </Card>

  <Card title="Aggregate by Entity" icon="folder-tree">
    Use wildcards to aggregate all events for an entity: `orders.123.>`
  </Card>

  <Card title="Control Message Ordering" icon="list-ol">
    Understand fold order: normal merge → unset → poison-pill → tombstone
  </Card>

  <Card title="Preserve History" icon="archive">
    Control messages don't delete history, only affect aggregation
  </Card>
</CardGroup>

***

## Event Sourcing Patterns

<AccordionGroup>
  <Accordion title="Entity State Management" icon="database">
    Track all changes to an entity and rebuild current state

    ```text theme={null}
    1. Publish events: orders.123.created, orders.123.paid
    2. Aggregate: Get current order state
    3. Control: Use unset to remove fields, poison-pill to reset
    ```
  </Accordion>

  <Accordion title="Audit Log" icon="clipboard-list">
    Maintain complete audit trail

    ```text theme={null}
    1. Every state change is a message
    2. Search by subject to see full history
    3. Tombstone when entity deleted (preserves history)
    ```
  </Accordion>

  <Accordion title="Temporal Queries" icon="timeline">
    Query state at any point in time

    ```text theme={null}
    1. Get messages up to specific sequence
    2. Aggregate to rebuild state at that moment
    3. Compare states across time
    ```
  </Accordion>
</AccordionGroup>

***

## Next Steps

<CardGroup cols={2}>
  <Card title="Key-Value Storage" icon="database" href="/flows/functions/key-value">
    Simple key-value data storage
  </Card>

  <Card title="Object Storage" icon="box" href="/flows/functions/object-storage">
    Store large files and media
  </Card>

  <Card title="Functions Step" icon="function" href="/flows/steps/functions">
    Using Functions in flows
  </Card>

  <Card title="Data Transformation" icon="shuffle" href="/flows/functions/utility">
    Transform and process data
  </Card>
</CardGroup>
