Pollen + Bloom

Event-driven AI jobs: Pollen ingests and persists signals; Bloom matches them to triggers and runs them through the supervisor.

Pollen + Bloom is the event-driven layer of Orchid — currently proposed and under active design (ADR-029, status: Proposed). The architecture is stable; implementation phases land incrementally.

Every standard Orchid run is synchronous: a user sends a message, the API resolves an OrchidAuthContext, the supervisor fans out, and the response streams back. Pollen + Bloom adds a complementary mode — AI work triggered by events — without duplicating the supervisor, auth model, or streaming infrastructure.

Two cooperating subsystems

Pollen is the event substrate. It ingests, persists, and dispatches immutable Signals. Every signal that enters the system is written to the signals table before any trigger runs — signals are replayable by definition.

Bloom is the execution layer. Triggers match signals using JMESPath filter expressions, build JobSpecs, and the JobRunner invokes the existing LangGraph supervisor with a synthesised OrchidAgentState. A JobRun is the unit of execution, persisted with status, result, and error.

              ┌──────── Pollen — ingest path (non-blocking) ──────────┐
   webhook ───┤                                                       │
   scheduler ─┼──> SignalProducer ──> Dispatcher.ingest() ──> Queue  │
   agent emit ┤                                                       │
              └───────────────────────────────────────────────────────┘

              ┌──────── Bloom — process path (async) ────┘
              │  SignalProcessor                          │
              │    ├─ resolve identity                   │
              │    ├─ match triggers (JMESPath)          │
              │    ├─ create JobRun row                  │
              │    └─> JobRunner ──> LangGraph supervisor│
              └───────────────────────────────────────────┘

Signals, triggers, and job runs

Five objects, clear lifecycle:

ObjectLifetimeNotes
SignalImmutabletype, payload, identity claim, occurred_at, source, dedupe_key
TriggerVersioned, soft-deletablematch expression + JobSpec template + retry policy
ScheduleMutable, soft-deletablecron/interval → emits synthetic cron signals
QueuedSignalTransientdropped on acknowledgement
JobRunMutablestatus, result, error, retries; each retry is a new row

Scheduling is not a separate concept — Schedule rows produce synthetic cron signals on the same dispatch path as webhooks and internal emissions.

Configuring Pollen + Bloom

events:
enabled: true

store:
  class: orchid_ai.events.backends.postgres.PostgresEventStore

queue:
  class: orchid_ai.events.queues.postgres.PostgresSignalQueue

producers:
  - class: orchid_ai.events.producers.http.HTTPIngestionProducer
    mount: /signals
  - class: orchid_ai.events.producers.scheduler.SchedulerProducer

processors:
  - class: orchid_ai.events.processors.asyncio_pool.AsyncioWorkerPoolProcessor
    concurrency: 4

triggers:
  - id: morning-digest
    on: { signal: cron, cron: "0 7 * * 1-5" }
    emits:
      agent: notifications
      prompt_template: "Build the morning digest for {{tenant_key}}"
      identity: { mode: service_account, name: digest-bot }
    retry: { max: 3, backoff: exponential }

  - id: high-priority-ticket
    on:
      signal: support.ticket.created
      when: "payload.priority == 'high'"
    emits:
      agent: helpdesk
      prompt_template: "New high-priority ticket: {{payload.summary}}"
      identity: { mode: act_as_user, user_id_from: signal.user_id }

Signal sources

All ingress paths normalise to a Signal and flow through the same dispatcher:

PathProducerNotes
POST /signals (HTTP)HTTPIngestionProducerHMAC-SHA256 per source. Returns 202 Accepted with signal_id — never blocks on processing.
Cron scheduleSchedulerProducerAPScheduler-driven; synthetic signal.type = "cron".
MCP tool orchid_signal_emitMCPIngestionProducerGateway forwards to POST /signals.
Agent self.emit_signal(...)InternalEmissionProducerNon-blocking; inherits the agent's OrchidAuthContext.
Custom busOrchidSignalProducer subclassKafka, SQS, Redis Streams, file watcher — wired via events.producers: YAML.

Identity and auth

Every signal carries an identity claim. The processor (not the ingest path) resolves it into an OrchidAuthContext before spawning a JobRun. Three flavours:

ModeWho actsAuth path
service_accountPlatform, on its ownFramework-managed named service identity
addressed_to_userPlatform, scoped to a user's dataService account + user_id in JobSpec
act_as_userThe user (impersonation)OrchidIdentityResolver.mint_for_user(tenant_key, user_id)

mint_for_user is an optional extension point on OrchidIdentityResolver. The default raises OrchidIdentityNotMintableError; consumers implement it according to what their identity provider supports.

Extension points

All three layers between emitter and supervisor are independently pluggable:

What to plugImplementWire via
New queue backend (Redis, Kafka, SQS)OrchidSignalQueueevents.queue.class: in orchid.yml
New event source (Kafka topic, file watcher)OrchidSignalProducerevents.producers: list
Different worker model (Celery, Lambda)OrchidSignalProcessorevents.processors: list
Custom webhook auth (mTLS, signed JWT)SignalAuthValidatorper-source validator.class:
Cross-cutting enrichment (PII redaction)SignalIngestMiddlewareevents.middleware: ordered list

Idempotency and retry

Signal-level: UNIQUE (source, dedupe_key) constraint. Duplicate ingest returns the original signal_id without re-enqueuing.

Queue-level retry: A failed processor calls queue.nack(...). Messages become visible again after a configurable delay; after max_attempts (default 5) the message moves to the dead-letter table.

Job-level retry: Declared per-trigger (retry.max, retry.backoff). Each retry is a new JobRun row — the audit trail is preserved. Job-level retry covers business-logic flakiness; queue-level retry covers infrastructure crashes.

Streaming

GET /runs/{run_id}/stream reuses the existing SSE infrastructure. It emits the same events the chat stream produces (agent.started, tool.called, tool.completed, agent.finished), plus four Bloom-specific events: bloom.signal.ingested, bloom.run.queued, bloom.run.started, bloom.run.finished.