Kafka & Event Bus
alquimia-runtime uses Apache Kafka as its sole event transport. The master publishes signed CloudEvents to a single Kafka topic; one or more worker instances consume them and dispatch handlers in-process. There is no separate broker service or HTTP loopback between components.
Architecture
Section titled “Architecture”POST /event/infer/{assistant_id} │ (master) ▼kafka.producer.publish() - Signs CloudEvent with HMAC-SHA256 - Publishes to alquimia.events topic (key = taskid) │ ▼ Kafka topic: alquimia.events │ ▼ (worker)KafkaWorker._loop() - Deserialises CloudEvent - Verifies HMAC-SHA256 signature → drops on failure │ ▼kafka.dispatcher.dispatch() - Routes by event type to in-process handler - Fan-out to webhook sinks (for eligible event types) │ ├─ pre-handler (per event type) │ e.g. _handle_response, _handle_tool_process │ ▼_handle_controller_process() - Loads/creates AssistantInferenceController from Redis - controller.process(event) → new events - Publishes follow-up events back to Kafka - Writes WorklogRecords to Redis SSE queue - Persists worklog to PostgreSQL (background task)Event signing
Section titled “Event signing”Every CloudEvent written to Kafka is signed with HMAC-SHA256 using the shared KAFKA_SIGNING_KEY. The signature covers a canonical JSON serialisation of the event’s specversion, type, id, source, time, and data fields (keys sorted). It is stored in the alquimiasignature CloudEvent extension attribute.
Workers drop any event whose signature is missing or invalid. This prevents rogue producers from injecting events into the execution pipeline.
# Generate a signing key (same value on master and all workers)python -c "import secrets; print(secrets.token_hex(32))"Set the result as KAFKA_SIGNING_KEY. A startup warning is emitted if this variable is not set — all events will be silently dropped by workers.
Partition key
Section titled “Partition key”The partition key for every event is the taskid CloudEvent extension. This means all events for a single inference task land on the same partition and are consumed by the same worker replica, preserving ordered processing.
Dispatch routing table
Section titled “Dispatch routing table”kafka/dispatcher.py maintains two routing tables.
Pre-handler table
Section titled “Pre-handler table”Event types routed through a pre-handler before reaching the controller:
| Event type | Handler | Description |
|---|---|---|
shield.inference.v1 | _handle_shield | Content moderation / guard model |
response.inference.v1 | _handle_response | LLM response inference |
context.flush.v1 | _handle_context_flush | Long-term memory summarization |
server.tool.execution.v1 | _handle_tool_process | MCP / Llama Stack tool execution |
builtin.tool.execution.v1 | _handle_tool_builtin | Profile-embedded built-in tool |
tool.schema.v1 | _handle_tool_schema | Tool schema discovery |
a2a.execution.v1 | _handle_a2a_inference | Agent-to-agent delegation |
agent.discovery.v1 | _handle_agent_discover | Agent registry lookup |
assistant.inference.response.v1 | _handle_a2a_response | A2A response routing |
Pre-handlers execute the side-effecting work (call the LLM, execute the tool, etc.) and return an intermediate response CloudEvent. That response is then passed to _handle_controller_process, which updates the controller state and publishes follow-up events.
Direct-to-controller events
Section titled “Direct-to-controller events”These event types bypass pre-handlers and go directly to _handle_controller_process:
assistant.inference.v1— starts a new inference runattachment.normalized.v1— file attachment ready for contextcontroller.restart.v1— restart a taskcontroller.state.change.v1— run / stop signaltool.human.approval.required.response.v1— human approval decision
Terminal event
Section titled “Terminal event”context.persistence.v1 is handled by _handle_context_persistence and has no controller follow-up. It saves the conversation to Redis and returns.
Webhook fan-out
Section titled “Webhook fan-out”Before in-process dispatch, the worker checks whether the event type is eligible for external webhook delivery. Eligible types:
| Event type | Depth filter |
|---|---|
assistant.inference.response.v1 | depth=0 only |
context.persistence.v1 | all depths |
tool.human.approval.required.v1 | all depths |
empathy.rule.matched.v1 | all depths |
response.inference.response.v1 | all depths |
A2A sub-results (assistant.inference.response.v1 at depth > 0) are intentionally excluded to avoid flooding subscribers with intermediate signal.
Delivery is fire-and-forget via asyncio.create_task. Each webhook POST includes an X-Webhook-Signature: sha256=<hex> header for verification. See Inference Endpoints and the Registry API docs for how to register webhook subscriptions.
Kafka configuration
Section titled “Kafka configuration”| Variable | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Broker address(es) |
KAFKA_TOPIC | alquimia.events | Topic for all agent events |
KAFKA_CONSUMER_GROUP | alquimia-workers | Shared consumer group across all worker replicas |
KAFKA_SIGNING_KEY | (required) | 64-character hex key — must match across all instances |
KAFKA_SECURITY_PROTOCOL | PLAINTEXT | Set to SASL_SSL in production |
KAFKA_SASL_MECHANISM | "" | e.g., SCRAM-SHA-256 |
KAFKA_SASL_USERNAME | "" | SASL username |
KAFKA_SASL_PASSWORD | "" | SASL password |
See Configuration Reference for the complete settings list.
Worker scaling
Section titled “Worker scaling”All worker replicas share KAFKA_CONSUMER_GROUP. Kafka distributes partitions across replicas — scale workers to match the number of Kafka partitions for maximum throughput. Workers beyond the partition count will be idle consumers.
kubectl scale deployment alquimia-runtime-worker --replicas=4 -n alquimia-runtimeSee Kubernetes & OpenShift for deployment details.
Related pages
Section titled “Related pages”- Inference Endpoints —
POST /event/inferpublishes to Kafka - Controller API — the controller state machine driven by Kafka events
- Configuration Reference —
KAFKA_*variables - Kubernetes & OpenShift — deploying workers at scale
Source
Section titled “Source”Alquimia-ai/alquimia-runtime—runtime/src/kafka/(producer, consumer, dispatcher, signing)