Pollen + Bloom
Event-driven AI jobs: Pollen ingests and persists signals; Bloom matches them to triggers and runs them through the supervisor.
Pollen + Bloom is the event-driven layer of Orchid — currently proposed and under active design (ADR-029, status: Proposed). The architecture is stable; implementation phases land incrementally.
Every standard Orchid run is synchronous: a user sends a message, the API resolves an OrchidAuthContext, the supervisor fans out, and the response streams back. Pollen + Bloom adds a complementary mode — AI work triggered by events — without duplicating the supervisor, auth model, or streaming infrastructure.
Two cooperating subsystems
Pollen is the event substrate. It ingests, persists, and dispatches immutable Signals. Every signal that enters the system is written to the signals table before any trigger runs — signals are replayable by definition.
Bloom is the execution layer. Triggers match signals using JMESPath filter expressions, build JobSpecs, and the JobRunner invokes the existing LangGraph supervisor with a synthesised OrchidAgentState. A JobRun is the unit of execution, persisted with status, result, and error.
┌──────── Pollen — ingest path (non-blocking) ──────────┐
webhook ───┤ │
scheduler ─┼──> SignalProducer ──> Dispatcher.ingest() ──> Queue │
agent emit ┤ │
└───────────────────────────────────────────────────────┘
│
┌──────── Bloom — process path (async) ────┘
│ SignalProcessor │
│ ├─ resolve identity │
│ ├─ match triggers (JMESPath) │
│ ├─ create JobRun row │
│ └─> JobRunner ──> LangGraph supervisor│
└───────────────────────────────────────────┘Signals, triggers, and job runs
Five objects, clear lifecycle:
| Object | Lifetime | Notes |
|---|---|---|
Signal | Immutable | type, payload, identity claim, occurred_at, source, dedupe_key |
Trigger | Versioned, soft-deletable | match expression + JobSpec template + retry policy |
Schedule | Mutable, soft-deletable | cron/interval → emits synthetic cron signals |
QueuedSignal | Transient | dropped on acknowledgement |
JobRun | Mutable | status, result, error, retries; each retry is a new row |
Scheduling is not a separate concept — Schedule rows produce synthetic cron signals on the same dispatch path as webhooks and internal emissions.
Configuring Pollen + Bloom
events:
enabled: true
store:
class: orchid_ai.events.backends.postgres.PostgresEventStore
queue:
class: orchid_ai.events.queues.postgres.PostgresSignalQueue
producers:
- class: orchid_ai.events.producers.http.HTTPIngestionProducer
mount: /signals
- class: orchid_ai.events.producers.scheduler.SchedulerProducer
processors:
- class: orchid_ai.events.processors.asyncio_pool.AsyncioWorkerPoolProcessor
concurrency: 4
triggers:
- id: morning-digest
on: { signal: cron, cron: "0 7 * * 1-5" }
emits:
agent: notifications
prompt_template: "Build the morning digest for {{tenant_key}}"
identity: { mode: service_account, name: digest-bot }
retry: { max: 3, backoff: exponential }
- id: high-priority-ticket
on:
signal: support.ticket.created
when: "payload.priority == 'high'"
emits:
agent: helpdesk
prompt_template: "New high-priority ticket: {{payload.summary}}"
identity: { mode: act_as_user, user_id_from: signal.user_id }Signal sources
All ingress paths normalise to a Signal and flow through the same dispatcher:
| Path | Producer | Notes |
|---|---|---|
POST /signals (HTTP) | HTTPIngestionProducer | HMAC-SHA256 per source. Returns 202 Accepted with signal_id — never blocks on processing. |
| Cron schedule | SchedulerProducer | APScheduler-driven; synthetic signal.type = "cron". |
MCP tool orchid_signal_emit | MCPIngestionProducer | Gateway forwards to POST /signals. |
Agent self.emit_signal(...) | InternalEmissionProducer | Non-blocking; inherits the agent's OrchidAuthContext. |
| Custom bus | OrchidSignalProducer subclass | Kafka, SQS, Redis Streams, file watcher — wired via events.producers: YAML. |
Identity and auth
Every signal carries an identity claim. The processor (not the ingest path) resolves it into an OrchidAuthContext before spawning a JobRun. Three flavours:
| Mode | Who acts | Auth path |
|---|---|---|
service_account | Platform, on its own | Framework-managed named service identity |
addressed_to_user | Platform, scoped to a user's data | Service account + user_id in JobSpec |
act_as_user | The user (impersonation) | OrchidIdentityResolver.mint_for_user(tenant_key, user_id) |
mint_for_user is an optional extension point on OrchidIdentityResolver. The default raises OrchidIdentityNotMintableError; consumers implement it according to what their identity provider supports.
Extension points
All three layers between emitter and supervisor are independently pluggable:
| What to plug | Implement | Wire via |
|---|---|---|
| New queue backend (Redis, Kafka, SQS) | OrchidSignalQueue | events.queue.class: in orchid.yml |
| New event source (Kafka topic, file watcher) | OrchidSignalProducer | events.producers: list |
| Different worker model (Celery, Lambda) | OrchidSignalProcessor | events.processors: list |
| Custom webhook auth (mTLS, signed JWT) | SignalAuthValidator | per-source validator.class: |
| Cross-cutting enrichment (PII redaction) | SignalIngestMiddleware | events.middleware: ordered list |
Idempotency and retry
Signal-level: UNIQUE (source, dedupe_key) constraint. Duplicate ingest returns the original signal_id without re-enqueuing.
Queue-level retry: A failed processor calls queue.nack(...). Messages become visible again after a configurable delay; after max_attempts (default 5) the message moves to the dead-letter table.
Job-level retry: Declared per-trigger (retry.max, retry.backoff). Each retry is a new JobRun row — the audit trail is preserved. Job-level retry covers business-logic flakiness; queue-level retry covers infrastructure crashes.
Streaming
GET /runs/{run_id}/stream reuses the existing SSE infrastructure. It emits the same events the chat stream produces (agent.started, tool.called, tool.completed, agent.finished), plus four Bloom-specific events: bloom.signal.ingested, bloom.run.queued, bloom.run.started, bloom.run.finished.