Skip to content

Kafka & Event Bus

alquimia-runtime uses Apache Kafka as its sole event transport. The master publishes signed CloudEvents to a single Kafka topic; one or more worker instances consume them and dispatch handlers in-process. There is no separate broker service or HTTP loopback between components.


POST /event/infer/{assistant_id}
│ (master)
kafka.producer.publish()
- Signs CloudEvent with HMAC-SHA256
- Publishes to alquimia.events topic (key = taskid)
▼ Kafka topic: alquimia.events
▼ (worker)
KafkaWorker._loop()
- Deserialises CloudEvent
- Verifies HMAC-SHA256 signature → drops on failure
kafka.dispatcher.dispatch()
- Routes by event type to in-process handler
- Fan-out to webhook sinks (for eligible event types)
├─ pre-handler (per event type)
│ e.g. _handle_response, _handle_tool_process
_handle_controller_process()
- Loads/creates AssistantInferenceController from Redis
- controller.process(event) → new events
- Publishes follow-up events back to Kafka
- Writes WorklogRecords to Redis SSE queue
- Persists worklog to PostgreSQL (background task)

Every CloudEvent written to Kafka is signed with HMAC-SHA256 using the shared KAFKA_SIGNING_KEY. The signature covers a canonical JSON serialisation of the event’s specversion, type, id, source, time, and data fields (keys sorted). It is stored in the alquimiasignature CloudEvent extension attribute.

Workers drop any event whose signature is missing or invalid. This prevents rogue producers from injecting events into the execution pipeline.

Terminal window
# Generate a signing key (same value on master and all workers)
python -c "import secrets; print(secrets.token_hex(32))"

Set the result as KAFKA_SIGNING_KEY. A startup warning is emitted if this variable is not set — all events will be silently dropped by workers.

The partition key for every event is the taskid CloudEvent extension. This means all events for a single inference task land on the same partition and are consumed by the same worker replica, preserving ordered processing.


kafka/dispatcher.py maintains two routing tables.

Event types routed through a pre-handler before reaching the controller:

Event typeHandlerDescription
shield.inference.v1_handle_shieldContent moderation / guard model
response.inference.v1_handle_responseLLM response inference
context.flush.v1_handle_context_flushLong-term memory summarization
server.tool.execution.v1_handle_tool_processMCP / Llama Stack tool execution
builtin.tool.execution.v1_handle_tool_builtinProfile-embedded built-in tool
tool.schema.v1_handle_tool_schemaTool schema discovery
a2a.execution.v1_handle_a2a_inferenceAgent-to-agent delegation
agent.discovery.v1_handle_agent_discoverAgent registry lookup
assistant.inference.response.v1_handle_a2a_responseA2A response routing

Pre-handlers execute the side-effecting work (call the LLM, execute the tool, etc.) and return an intermediate response CloudEvent. That response is then passed to _handle_controller_process, which updates the controller state and publishes follow-up events.

These event types bypass pre-handlers and go directly to _handle_controller_process:

  • assistant.inference.v1 — starts a new inference run
  • attachment.normalized.v1 — file attachment ready for context
  • controller.restart.v1 — restart a task
  • controller.state.change.v1 — run / stop signal
  • tool.human.approval.required.response.v1 — human approval decision

context.persistence.v1 is handled by _handle_context_persistence and has no controller follow-up. It saves the conversation to Redis and returns.


Before in-process dispatch, the worker checks whether the event type is eligible for external webhook delivery. Eligible types:

Event typeDepth filter
assistant.inference.response.v1depth=0 only
context.persistence.v1all depths
tool.human.approval.required.v1all depths
empathy.rule.matched.v1all depths
response.inference.response.v1all depths

A2A sub-results (assistant.inference.response.v1 at depth > 0) are intentionally excluded to avoid flooding subscribers with intermediate signal.

Delivery is fire-and-forget via asyncio.create_task. Each webhook POST includes an X-Webhook-Signature: sha256=<hex> header for verification. See Inference Endpoints and the Registry API docs for how to register webhook subscriptions.


VariableDefaultDescription
KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Broker address(es)
KAFKA_TOPICalquimia.eventsTopic for all agent events
KAFKA_CONSUMER_GROUPalquimia-workersShared consumer group across all worker replicas
KAFKA_SIGNING_KEY(required)64-character hex key — must match across all instances
KAFKA_SECURITY_PROTOCOLPLAINTEXTSet to SASL_SSL in production
KAFKA_SASL_MECHANISM""e.g., SCRAM-SHA-256
KAFKA_SASL_USERNAME""SASL username
KAFKA_SASL_PASSWORD""SASL password

See Configuration Reference for the complete settings list.


All worker replicas share KAFKA_CONSUMER_GROUP. Kafka distributes partitions across replicas — scale workers to match the number of Kafka partitions for maximum throughput. Workers beyond the partition count will be idle consumers.

Terminal window
kubectl scale deployment alquimia-runtime-worker --replicas=4 -n alquimia-runtime

See Kubernetes & OpenShift for deployment details.