> ## Documentation Index
> Fetch the complete documentation index at: https://microstrate-1133-notifications-prefs.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Stream Triggers

> Real-time message processing with stream-based triggers (Advanced Mode)

# Stream Triggers

Stream triggers enable real-time, event-driven flow execution by listening to message streams. Perfect for high-throughput data processing, real-time monitoring, event-driven architectures, and system integration.

Stream triggers are only available in Advanced Mode and require streams to be configured in the QuivaWorks hosting dashboard.

***

## Overview

Streams are persistent message channels that allow high-throughput, low-latency communication between systems. Stream triggers listen to these streams and execute flows when messages arrive.

**Key Features:**

* Real-time message processing
* High throughput (thousands of messages per second)
* Low latency (milliseconds)
* Message ordering guarantees
* Built-in retry and error handling
* Message filtering by subject
* Scalable architecture

***

## How It Works

Create a stream in the QuivaWorks hosting dashboard. Configure gateway mappings to write messages to the stream. Add a Stream trigger to your flow. Messages written to the stream automatically trigger your flow. Flow receives message data and processes it in real-time.

***

## Stream Types

### Stream Message Trigger

Triggers on every message written to the selected stream. No filtering applied. All messages trigger the flow. Best for processing every event.

**Configuration:**

* Select stream from dropdown
* Messages arrive in order
* Flow executes for each message

**Use when:** You need to process all messages on a stream.

### Message Subject Trigger

Triggers only when message subject matches specified pattern. Filters messages before triggering flow. Only matching messages trigger the flow. Reduces unnecessary flow executions.

**Configuration:**

* Select stream from dropdown
* Specify subject pattern to match
* Supports wildcards and patterns

**Use when:** You want to filter messages by subject before processing.

***

## Configuration

### Stream Selection

Choose from available streams configured in your hosting dashboard. Streams must be created before adding stream triggers. Stream name identifies the message channel.

**Stream naming convention:**

* Use descriptive names (e.g., "orders", "user-events", "analytics")
* Avoid special characters
* Use lowercase and hyphens

### Subject Filtering (Message Subject Trigger)

Filter messages by subject pattern:

**Exact match:**

```
order.created
```

**Wildcard patterns:**

```
order.*
user.*.updated
*.critical
```

**Multiple subjects:**
Configure multiple triggers for different subjects on same stream.

### Processing Mode

**Sequential Processing:**
Messages processed in order, one at a time. Guarantees message ordering. Lower throughput but maintains order.

**Parallel Processing:**
Multiple messages processed simultaneously. Higher throughput. No ordering guarantees.

Choose based on whether message order matters for your use case.

***

## Accessing Message Data

### Message Structure

All message data is available under `$.trigger.message`:

```javascript theme={null}
// Message content
const data = $.trigger.message.data;

// Message metadata
const subject = $.trigger.message.subject;
const timestamp = $.trigger.message.timestamp;
const messageId = $.trigger.message.id;
const stream = $.trigger.message.stream;

// Example values
// data: {"orderId": "ORD-123", "status": "created"}
// subject: "order.created"
// timestamp: "2025-10-14T10:30:00.123Z"
// messageId: "msg_abc123"
// stream: "orders"
```

### Message Data

Message data is typically JSON:

```javascript theme={null}
const messageData = $.trigger.message.data;

// Access properties
const orderId = messageData.orderId;
const customerId = messageData.customerId;
const amount = messageData.amount;

// For nested data
const address = messageData.customer.address;
const city = address.city;
```

### Message Metadata

Access message metadata for tracking and debugging:

```javascript theme={null}
// Message ID (unique identifier)
const messageId = $.trigger.message.id;

// Timestamp (when message was written)
const timestamp = $.trigger.message.timestamp;

// Stream name
const streamName = $.trigger.message.stream;

// Subject (message topic)
const subject = $.trigger.message.subject;

// Sequence number (message order in stream)
const sequence = $.trigger.message.sequence;
```

***

## Use Cases

### Real-Time Order Processing

**Scenario:** Process orders as they are created

**Stream:** orders

**Message Subjects:**

* order.created
* order.updated
* order.cancelled

**Flow:**

* Trigger: Message Subject (order.created)
* Agent: Validate order details
* HTTP Request: Check inventory
* Condition: In stock?
  * Yes: Process payment
  * No: Notify customer
* HTTP Request: Update order management system

**Benefits:** Instant order processing, real-time inventory checks, immediate customer notifications.

### System Event Monitoring

**Scenario:** Monitor system events for alerts

**Stream:** system-events

**Message Subjects:**

* system.error
* system.warning
* system.critical

**Flow:**

* Trigger: Message Subject (system.critical)
* Agent: Analyze error details
* Condition: Requires immediate attention?
  * Yes: Page on-call engineer
  * No: Log for review
* HTTP Request: Create incident ticket

**Benefits:** Real-time alerting, automatic escalation, comprehensive logging.

### User Activity Tracking

**Scenario:** Track and respond to user activities

**Stream:** user-events

**Message Subjects:**

* user.signup
* user.login
* user.purchase
* user.churn-risk

**Flow:**

* Trigger: Message Subject (user.signup)
* Agent: Analyze user profile
* HTTP Request: Send welcome email
* HTTP Request: Create onboarding tasks
* HTTP Request: Notify account manager

**Benefits:** Immediate engagement, personalized onboarding, activity tracking.

### IoT Data Processing

**Scenario:** Process sensor data in real-time

**Stream:** sensor-data

**Message Subjects:**

* sensor.temperature
* sensor.humidity
* sensor.pressure

**Flow:**

* Trigger: Stream Message (sensor-data)
* Agent: Analyze sensor readings
* Condition: Threshold exceeded?
  * Yes: Send alert
  * No: Log data
* HTTP Request: Update dashboard

**Benefits:** Real-time monitoring, instant alerts, data aggregation.

### Financial Transaction Processing

**Scenario:** Process financial transactions

**Stream:** transactions

**Message Subjects:**

* transaction.initiated
* transaction.completed
* transaction.failed
* transaction.fraudulent

**Flow:**

* Trigger: Message Subject (transaction.initiated)
* Agent: Fraud detection analysis
* Condition: Suspicious activity?
  * Yes: Flag for review
  * No: Process transaction
* HTTP Request: Update account balance
* HTTP Request: Send receipt

**Benefits:** Real-time fraud detection, instant processing, audit trail.

### Content Moderation

**Scenario:** Moderate user-generated content

**Stream:** content-submissions

**Message Subjects:**

* content.submitted
* content.flagged
* content.reported

**Flow:**

* Trigger: Message Subject (content.submitted)
* Agent: Content moderation analysis
* Condition: Contains violations?
  * Yes: Remove and notify user
  * No: Publish content
* HTTP Request: Update content database

**Benefits:** Real-time moderation, automatic filtering, user safety.

***

## Gateway Configuration

Streams require gateway mappings to write messages. Configure in QuivaWorks hosting dashboard.

### Creating Gateway Mapping

Navigate to hosting dashboard. Go to Gateway section. Click "Add Mapping". Configure:

**Endpoint Path:** URL path that accepts messages (e.g., /api/events)

**Target Stream:** Stream to write messages to

**Subject Template:** Subject pattern for messages

**Authentication:** API key or other auth method

### Writing Messages

Once gateway configured, write messages via HTTP:

```bash theme={null}
curl -X POST https://gateway.quiva.ai/api/events \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -d '{
    "subject": "order.created",
    "data": {
      "orderId": "ORD-123",
      "customerId": "CUST-456",
      "amount": 99.99
    }
  }'
```

**From application:**

```javascript theme={null}
async function publishEvent(subject, data) {
  await fetch('https://gateway.quiva.ai/api/events', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${API_KEY}`
    },
    body: JSON.stringify({ subject, data })
  });
}

// Usage
await publishEvent('order.created', {
  orderId: 'ORD-123',
  customerId: 'CUST-456',
  amount: 99.99
});
```

***

## Best Practices

### Message Design

**Keep messages small:** Aim for under 1 KB per message. Large messages slow processing.

**Use clear subjects:** Subject should indicate message type. Use hierarchical naming (e.g., order.created, order.updated).

**Include timestamps:** Always include event timestamp in message data.

**Add message IDs:** Include unique identifier for tracking and deduplication.

**Structure data consistently:** Use consistent JSON schema across message types.

### Subject Naming

**Use hierarchical structure:**

```
resource.action
user.created
user.updated
user.deleted

order.created
order.shipped
order.delivered
```

**Use dots for hierarchy:**

```
system.error.database
system.error.api
system.warning.performance
```

**Be specific:**

```
Good: order.payment.completed
Bad: order.done
```

### Error Handling

**Implement retry logic:** Messages that fail processing will be retried automatically.

**Handle poison messages:** Messages that consistently fail should be moved to dead-letter queue.

**Log failures:** Track failed messages for debugging.

**Set timeout limits:** Prevent long-running flows from blocking stream.

```javascript theme={null}
try {
  await processMessage($.trigger.message.data);
} catch (error) {
  console.error('Message processing failed:', {
    messageId: $.trigger.message.id,
    subject: $.trigger.message.subject,
    error: error.message
  });
  
  // Decide whether to retry
  if (error.retryable) {
    throw error; // Will retry
  } else {
    // Move to dead-letter queue
    await moveToDeadLetter($.trigger.message);
  }
}
```

### Performance Optimization

**Batch when possible:** Group related operations together.

**Use parallel processing:** For independent messages, enable parallel processing.

**Minimize external calls:** Reduce API calls to external services.

**Cache frequently used data:** Cache reference data to reduce lookups.

**Monitor throughput:** Track messages processed per second.

### Monitoring and Observability

**Track message metrics:**

* Messages received per second
* Processing time per message
* Error rate
* Retry rate
* Queue depth

**Set up alerts:**

* High error rate
* Processing delays
* Queue backup
* Failed messages

**Log message processing:**

```javascript theme={null}
console.log('Processing message', {
  messageId: $.trigger.message.id,
  subject: $.trigger.message.subject,
  timestamp: $.trigger.message.timestamp,
  processingStarted: new Date().toISOString()
});

// ... process message ...

console.log('Message processed', {
  messageId: $.trigger.message.id,
  duration: Date.now() - startTime,
  success: true
});
```

***

## Message Ordering

### Sequential Processing

Messages processed in order they were written to stream. Next message waits until current message completes. Guarantees ordering but lower throughput.

**Use when:**

* Order matters (e.g., account balance updates)
* State depends on sequence (e.g., status transitions)
* Dependencies between messages

### Parallel Processing

Multiple messages processed simultaneously. No ordering guarantees. Higher throughput.

**Use when:**

* Messages are independent
* Order doesn't matter
* High throughput required
* Idempotent operations

### Ordering Guarantees

**Within a stream:** Messages ordered by write time.

**Across streams:** No ordering guarantees.

**Subject filtering:** Order maintained within filtered subject.

***

## Advanced Features

### Dead Letter Queue

Messages that fail repeatedly are moved to dead letter queue. Prevents poison messages from blocking stream. Allows manual inspection and reprocessing.

**Configuration:**

* Set max retry attempts (default: 3)
* Configure dead letter stream
* Set retention period

**Access dead letter messages:**

* View in hosting dashboard
* Reprocess manually
* Analyze for patterns

### Message Replay

Replay historical messages from stream. Useful for:

* Reprocessing after bug fixes
* Backfilling data
* Testing with production data
* Disaster recovery

**Replay from timestamp:**

```
Replay from: 2025-10-14T00:00:00Z
Replay to: 2025-10-14T23:59:59Z
```

### Stream Analytics

Monitor stream health and performance:

* Message rate (per second/minute/hour)
* Processing latency (p50, p95, p99)
* Error rate
* Consumer lag
* Queue depth

**Dashboard metrics:**

* Real-time message rate chart
* Latency distribution
* Error rate over time
* Consumer performance

### Multi-Consumer Patterns

Multiple flows can consume from same stream. Each consumer processes independently. Useful for:

* Different processing logic per consumer
* Separation of concerns
* Parallel processing pipelines

**Example:**
Stream: user-events

Consumer 1: Analytics processing
Consumer 2: Email notifications
Consumer 3: Database updates

***

## Troubleshooting

### Messages Not Triggering Flow

**Check stream exists:** Verify stream created in hosting dashboard.

**Check gateway mapping:** Ensure gateway configured to write to stream.

**Check flow is active:** Verify flow is published and trigger enabled.

**Check subject filter:** Ensure message subject matches filter pattern.

**Review stream logs:** Check if messages arriving at stream.

### Processing Delays

**High message volume:** Too many messages for current processing capacity.

**Slow flow execution:** Flow steps taking too long to complete.

**External API delays:** Third-party services responding slowly.

**Resource limits:** Hitting compute or memory limits.

**Solutions:**

* Enable parallel processing
* Optimize flow steps
* Increase resource allocation
* Add more consumers

### Message Loss

**Check retention settings:** Messages may have expired.

**Check consumer acknowledgment:** Messages not acknowledged may be lost.

**Check error logs:** Processing errors may cause message loss.

**Review dead letter queue:** Failed messages moved to DLQ.

### High Error Rates

**Validate message format:** Ensure messages match expected schema.

**Check external services:** Verify APIs and services are accessible.

**Review error logs:** Identify common error patterns.

**Test with sample messages:** Validate flow with known good messages.

***

## Stream Limits by Plan

**Free Tier:**

* Not available (requires Pro or higher)

**Starter Plan:**

* Not available (requires Pro or higher)

**Pro Plan:**

* 5 streams
* 10,000 messages per hour
* 7-day message retention
* Sequential processing only

**Team Plan:**

* 25 streams
* 100,000 messages per hour
* 30-day message retention
* Parallel processing enabled
* Dead letter queue

**Enterprise Plan:**

* Unlimited streams
* Custom message limits
* Custom retention
* Advanced features
* Dedicated infrastructure
* Priority support

***

## Performance Considerations

### Throughput

**Sequential processing:**

* Depends on flow execution time
* Typically 10-100 messages per second
* Guaranteed ordering

**Parallel processing:**

* Much higher throughput
* Typically 100-1,000 messages per second
* No ordering guarantees

### Latency

**End-to-end latency:**

* Message write to stream: Less than 10ms
* Trigger activation: Less than 50ms
* Flow execution: Depends on flow complexity

**Total latency:** Typically under 1 second for simple flows.

### Scalability

**Vertical scaling:** Increase resources per consumer.

**Horizontal scaling:** Add more consumers (parallel processing).

**Stream partitioning:** Split stream by subject for parallel processing.

***

## Migration Guide

### From Webhooks to Streams

**Why migrate:**

* Higher throughput
* Lower latency
* Better ordering guarantees
* Built-in retry logic

**Migration steps:**

* Create stream in hosting dashboard
* Configure gateway mapping
* Update webhook sender to use gateway
* Add stream trigger to flow
* Test with sample messages
* Gradually migrate traffic

### From Polling to Streams

**Why migrate:**

* Real-time processing (no polling delay)
* More efficient (event-driven)
* Lower costs (no wasted polls)
* Better resource usage

**Migration steps:**

* Identify polling endpoints
* Create streams for events
* Configure gateway mappings
* Replace schedule triggers with stream triggers
* Remove polling logic

***

## Next Steps

<CardGroup cols={2}>
  <Card title="Steps Overview" icon="list" href="/flows/steps/overview">
    Learn about flow steps
  </Card>

  <Card title="Agent Steps" icon="robot" href="/flows/steps/agents">
    Process stream messages with AI agents
  </Card>

  <Card title="HTTP Request Step" icon="globe" href="/flows/steps/http-request">
    Call external services from stream flows
  </Card>

  <Card title="Condition Step" icon="code-branch" href="/flows/steps/condition">
    Route messages based on content
  </Card>
</CardGroup>
