If you’ve worked with REST APIs, you probably know OpenAPI (formerly Swagger). It’s the standard for documenting HTTP APIs—describing endpoints, request/response schemas, and authentication. Tools generate documentation, client libraries, and mock servers from OpenAPI specs.
But what about event-driven systems? When you have services communicating through Kafka, RabbitMQ, or SQS, how do you document:
- Which topics/queues exist?
- What events flow through them?
- Who produces and consumes each event?
- What’s the schema of each message?
Enter AsyncAPI.
What is AsyncAPI?
AsyncAPI is a specification for documenting asynchronous, event-driven APIs. Think of it as OpenAPI for message brokers. It provides a machine-readable format to describe:
- Channels: Topics, queues, or exchanges where messages flow
- Messages: The events themselves, including schemas
- Operations: Whether a service publishes to or subscribes from a channel
- Servers: Broker connection details (Kafka, RabbitMQ, MQTT, etc.)
The specification launched in 2017 and has matured significantly. Version 3.0 (released in late 2023) introduced clearer semantics around operations and better support for request-reply patterns.
Why Does This Matter?
In a microservices architecture, event-driven communication creates an “invisible” layer of integration. Unlike REST endpoints that you can discover through API gateways, message flows are hidden inside broker configurations and consumer group assignments.
This leads to common problems:
- Tribal knowledge: “Oh, the order service publishes to
orders.created? Only Sarah knows that.” - Schema drift: Producer changes the event format, consumers break silently
- Orphaned topics: Topics that nothing reads from anymore
- Duplicate events: Two services publishing similar events because they didn’t know the other existed
AsyncAPI addresses these by creating a single source of truth for your event-driven contracts.
Anatomy of an AsyncAPI Spec
Let’s look at a real example. Here’s a simplified spec for an Order Producer service:
asyncapi: 3.0.0
info:
title: Order Producer Service
version: 1.0.0
description: |
Service that generates order events and publishes them to Kafka.
servers:
production:
host: kafka.prod.example.com:9092
protocol: kafka
description: Production Kafka cluster
channels:
ordersCreated:
address: orders.created
description: Channel for new order events
messages:
orderCreated:
$ref: '#/components/messages/OrderCreated'
operations:
publishOrderCreated:
action: send
channel:
$ref: '#/channels/ordersCreated'
summary: Publish a new order event
messages:
- $ref: '#/channels/ordersCreated/messages/orderCreated'
components:
messages:
OrderCreated:
name: OrderCreated
title: Order Created Event
summary: Event emitted when a new order is placed
contentType: application/json
payload:
type: object
required:
- order_id
- customer_id
- items
- total_amount
properties:
order_id:
type: string
description: Unique order identifier
customer_id:
type: string
description: Customer who placed the order
items:
type: array
items:
type: object
properties:
product_id:
type: string
quantity:
type: integer
unit_price:
type: string
total_amount:
type: string
description: Total as decimal string
created_at:
type: string
format: date-time
Key Sections
info: Metadata about the API—title, version, description. This appears in generated documentation.
servers: Connection details for your message brokers. You can define multiple environments (dev, staging, prod).
channels: The topics or queues. Each channel has an address (the actual topic name) and defines which messages flow through it.
operations: What your service does. The action field is either send (publish) or receive (subscribe).
components: Reusable definitions for messages, schemas, and other elements.
A Complete Example: Order Fulfillment System
Let’s build something real. I’ve created a demo application with two FastStream services, Redpanda (Kafka-compatible), and Redpanda Connect bridging to SQS.
Architecture Overview
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ Order Producer │────▶│ orders.created │────▶│ Order Fulfillment │
│ (FastStream) │ │ (Kafka) │ │ (FastStream) │
└─────────────────┘ └──────────────────┘ └──────────┬──────────┘
│
┌──────────────────┐ │
│ orders.accepted │◀──────────────┤
├──────────────────┤ │
│ orders.shipped │◀──────────────┤
├──────────────────┤ │
│ orders.fulfilled │◀──────────────┘
└────────┬─────────┘
│
┌────────▼─────────┐ ┌─────────────────────┐
│ Redpanda Connect │────▶│ AWS SQS │
└──────────────────┘ │ (LocalStack) │
└─────────────────────┘
The Order Producer generates random orders every few seconds. The Order Fulfillment service consumes these orders and publishes lifecycle events as orders progress through validation, shipping, and completion. Redpanda Connect bridges these events to SQS for downstream consumers.
The Producer Service
Using FastStream, a Python framework for async messaging, the producer is straightforward. FastStream 0.6+ uses a separate AsyncAPI specification object for metadata:
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.specification import AsyncAPI
broker = KafkaBroker("localhost:19092")
# AsyncAPI metadata is defined separately
spec = AsyncAPI(
title="Order Producer Service",
version="1.0.0",
description="Generates order events",
)
app = FastStream(broker, specification=spec)
# Type the publisher with a Pydantic model for schema generation
publisher = broker.publisher(
"orders.created",
title="Order Created Publisher",
description="Publishes new order events",
schema=OrderCreated, # This drives AsyncAPI schema generation
)
async def publish_order(order: OrderCreated):
await publisher.publish(
order.model_dump_json(),
key=order.customer_id.encode(),
headers={
"event_type": "OrderCreated",
"correlation_id": order.order_id,
},
)
FastStream uses Pydantic models for serialization and can auto-generate AsyncAPI specs from your code. That’s powerful—your documentation stays in sync with your implementation.
The Consumer Service
The fulfillment service subscribes to orders and publishes downstream events. Notice how publishers are typed with their schema for AsyncAPI generation:
from faststream.specification import AsyncAPI
spec = AsyncAPI(
title="Order Fulfillment Service",
version="1.0.0",
description="Processes orders through the fulfillment pipeline.",
)
app = FastStream(broker, specification=spec)
# Publishers with typed schemas
accepted_publisher = broker.publisher(
"orders.accepted",
title="Order Accepted Publisher",
schema=OrderAccepted,
)
shipped_publisher = broker.publisher(
"orders.shipped",
title="Order Shipped Publisher",
schema=OrderShipped,
)
fulfilled_publisher = broker.publisher(
"orders.fulfilled",
title="Order Fulfilled Publisher",
schema=OrderFulfilled,
)
@broker.subscriber(
"orders.created",
group_id="order-fulfillment-service",
title="Order Created Subscriber",
description="Consumes new order events and initiates fulfillment",
)
async def handle_order_created(body: str, msg: KafkaMessage):
order = OrderCreated.model_validate_json(body)
# Validate and accept
accepted = OrderAccepted(
order_id=order.order_id,
customer_id=order.customer_id,
estimated_ship_date=calculate_ship_date(),
warehouse_id=assign_warehouse(),
correlation_id=order.order_id,
)
await accepted_publisher.publish(accepted.model_dump_json())
# Ship the order
shipped = OrderShipped(
order_id=order.order_id,
tracking_number=generate_tracking(),
carrier=select_carrier(),
# ...
)
await shipped_publisher.publish(shipped.model_dump_json())
# Mark fulfilled
fulfilled = OrderFulfilled(...)
await fulfilled_publisher.publish(fulfilled.model_dump_json())
The AsyncAPI Spec for the Consumer
Here’s where AsyncAPI shines. The consumer service both receives AND sends events. The spec documents both:
asyncapi: 3.0.0
info:
title: Order Fulfillment Service
version: 1.0.0
description: |
Processes orders through the fulfillment pipeline.
### Events Consumed
- **OrderCreated**: Triggers fulfillment
### Events Published
- **OrderAccepted**: Order validated
- **OrderShipped**: Order dispatched
- **OrderFulfilled**: Process complete
channels:
ordersCreated:
address: orders.created
messages:
orderCreated:
$ref: '#/components/messages/OrderCreated'
ordersAccepted:
address: orders.accepted
messages:
orderAccepted:
$ref: '#/components/messages/OrderAccepted'
ordersShipped:
address: orders.shipped
messages:
orderShipped:
$ref: '#/components/messages/OrderShipped'
ordersFulfilled:
address: orders.fulfilled
messages:
orderFulfilled:
$ref: '#/components/messages/OrderFulfilled'
operations:
consumeOrderCreated:
action: receive
channel:
$ref: '#/channels/ordersCreated'
summary: Consume new orders for fulfillment
publishOrderAccepted:
action: send
channel:
$ref: '#/channels/ordersAccepted'
summary: Publish order acceptance events
publishOrderShipped:
action: send
channel:
$ref: '#/channels/ordersShipped'
summary: Publish shipping events
publishOrderFulfilled:
action: send
channel:
$ref: '#/channels/ordersFulfilled'
summary: Publish fulfillment completion
Now anyone can look at this spec and understand:
- What this service does
- What events it consumes
- What events it produces
- The schema of each event
Bridging with Redpanda Connect
Redpanda Connect (formerly Benthos) bridges our Kafka events to SQS. Its configuration is YAML-based:
input:
kafka_franz:
seed_brokers:
- redpanda:9092
topics:
- orders.accepted
- orders.shipped
- orders.fulfilled
consumer_group: redpanda-connect-sqs-bridge
pipeline:
processors:
- mapping: |
root = this.parse_json()
root.bridge_metadata = {
"source_topic": @kafka_topic,
"bridged_at": now()
}
output:
switch:
cases:
- check: meta("event_type") == "OrderFulfilled"
output:
aws_sqs:
url: http://localstack:4566/000000000000/order-fulfillment-events
- output:
aws_sqs:
url: http://localstack:4566/000000000000/order-notifications
This routes OrderFulfilled events to one queue and everything else to a notifications queue. The bridge itself could have an AsyncAPI spec documenting what it consumes and where it routes messages.
Schema Registry Integration
The demo uses Avro schemas registered with Redpanda’s built-in Schema Registry. This provides:
- Schema evolution: Add fields without breaking consumers
- Compatibility checks: Registry rejects incompatible changes
- Efficient serialization: Binary format smaller than JSON
Here’s the Avro schema for OrderCreated:
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.orders.events",
"doc": "Event emitted when a new order is created",
"fields": [
{
"name": "order_id",
"type": "string",
"doc": "Unique identifier for the order"
},
{
"name": "customer_id",
"type": "string"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unit_price", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}}
]
}
}
},
{
"name": "total_amount",
"type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
},
{
"name": "created_at",
"type": {"type": "long", "logicalType": "timestamp-millis"}
}
]
}
AsyncAPI can reference Schema Registry subjects directly, keeping your API docs in sync with registered schemas.
Generating Specs from Code
Here’s where FastStream really shines: you can generate AsyncAPI specs directly from your code. No manual YAML writing required.
FastStream CLI
FastStream includes a built-in CLI that inspects your decorated publishers, subscribers, and Pydantic models to generate specs:
# Generate YAML spec from your FastStream app
cd producer
uv run faststream docs gen app.main:app --yaml
# This creates asyncapi.yaml with:
# - All publishers and subscribers as channels/operations
# - Pydantic models converted to JSON Schema
# - Examples from model's json_schema_extra
# - Descriptions from docstrings
The generated spec includes everything FastStream knows about your app:
# Auto-generated from code!
info:
title: Order Producer Service
version: 1.0.0
description: "## Order Producer Service\n\nThis service generates order events..."
asyncapi: 3.0.0
channels:
Order Created Publisher:
address: Order Created Publisher
description: Publishes new order events to the orders.created topic
bindings:
kafka:
topic: orders.created
operations:
Order Created Publisher:
action: send
channel:
$ref: '#/channels/Order Created Publisher'
components:
schemas:
OrderCreated:
description: 'Event published when a new order is created...'
properties:
order_id:
type: string
customer_id:
type: string
# ... all fields from your Pydantic model
examples:
- order_id: ord_12345
customer_id: cust_67890
# ... from json_schema_extra
Serve Interactive Docs
FastStream can also serve an interactive documentation viewer:
# Start the docs server
cd producer
uv run faststream docs serve app.main:app
# Opens browser with interactive spec viewer
AsyncAPI CLI Tools
The AsyncAPI ecosystem provides additional tooling:
# Install the AsyncAPI CLI
npm install -g @asyncapi/cli
# Validate specs
asyncapi validate docs/asyncapi-producer.yaml
# Generate HTML documentation (Swagger UI style)
asyncapi generate fromTemplate docs/asyncapi-producer.yaml @asyncapi/html-template -o docs/html
# Start interactive studio
asyncapi start studio docs/asyncapi-producer.yaml
AsyncAPI Studio
The AsyncAPI Studio is a web-based editor with:
- Live preview of documentation
- Schema validation
- Visual channel/operation mapping
Code Generation from Specs
Generate code from specs (the reverse direction):
# Generate TypeScript types
asyncapi generate fromTemplate spec.yaml @asyncapi/typescript-nats-template
# Generate Python models
asyncapi generate fromTemplate spec.yaml @asyncapi/python-paho-template
This creates a static site with interactive schema exploration, similar to Swagger UI for REST APIs.
Discoverability and Governance
AsyncAPI becomes truly powerful when combined with a central registry. Imagine:
- Event Catalog: All services register their AsyncAPI specs
- Search: “Show me all services that consume
orders.created” - Impact Analysis: “If I change this schema, who breaks?”
- Lineage: Visual graphs of event flows across services
Tools like EventCatalog and Backstage can aggregate AsyncAPI specs to provide this visibility.
Example: Tracking Producers and Consumers
From our two AsyncAPI specs, we can extract:
| Channel | Producers | Consumers |
|---|---|---|
orders.created | Order Producer | Order Fulfillment |
orders.accepted | Order Fulfillment | Redpanda Connect |
orders.shipped | Order Fulfillment | Redpanda Connect |
orders.fulfilled | Order Fulfillment | Redpanda Connect |
This mapping answers questions like:
- “Who do I contact about changes to
orders.created?” → Order Producer team - “What breaks if Order Fulfillment goes down?” → Nothing receives fulfillment events
- “Is anyone still consuming
orders.legacy?” → Check the registry
AsyncAPI and the Transactional Outbox Pattern
In the demo above, services publish directly to Kafka. But what happens if Kafka is down? Your order gets saved to the database, but the event never publishes. You end up with inconsistent state.
The Transactional Outbox pattern solves this by writing events to a database table in the same transaction as your business data, then having a separate relay publish to Kafka. This guarantees delivery without distributed transactions.
When using the outbox pattern with AsyncAPI, you may have two APIs to document:
- OpenAPI: The HTTP API that receives events (if using an API-based outbox)
- AsyncAPI: The Kafka topics where events eventually land
The key is cross-referencing between specs. In your AsyncAPI description, note that events flow through an outbox:
info:
description: |
Events are delivered via the Transactional Outbox pattern with
at-least-once semantics. Consumers should handle duplicates.
See: [Events Ingestion API](/docs/openapi/events-api)
For a complete deep dive into implementing the outbox pattern, see my dedicated post: The Transactional Outbox Pattern: Reliable Event Publishing.
Running the Demo
The complete demo is available in the repository. To run it:
cd asyncapi-demo
# Start all services
make up
# View Redpanda Console at http://localhost:8080
# Watch logs
make logs
# Check SQS messages
make sqs-messages
# Stop everything
make down
You’ll see:
- Orders generated every 5 seconds
- Each order flows through: Created → Accepted → Shipped → Fulfilled
- Events bridged to SQS queues
Generate Specs from Running Code
# Generate AsyncAPI specs from your FastStream services
make generate-specs
# This runs:
# cd producer && uv run faststream docs gen app.main:app --yaml
# cd consumer && uv run faststream docs gen app.main:app --yaml
# Serve interactive docs
make serve-producer-docs
make serve-consumer-docs
Best Practices
From building this demo, some recommendations:
1. Generate from Code, Don’t Hand-Write
With FastStream, you can generate specs directly from your decorated publishers and subscribers. This keeps documentation in sync with implementation:
# Add schema= to publishers for automatic spec generation
publisher = broker.publisher(
"orders.created",
title="Order Created Publisher",
description="Publishes new order events",
schema=OrderCreated, # Pydantic model → JSON Schema
)
Then generate:
faststream docs gen app.main:app --yaml
2. Spec Per Service
Each service owns its AsyncAPI spec. The producer documents what it publishes; the consumer documents what it subscribes to AND publishes.
3. Version Your Specs
Use semantic versioning. Breaking changes (removing fields, changing types) bump the major version.
4. Include Examples
Add examples to your Pydantic models—FastStream includes them in the generated spec:
class OrderCreated(BaseModel):
order_id: str
customer_id: str
# ...
class Config:
json_schema_extra = {
"examples": [
{
"order_id": "ord_abc123",
"customer_id": "cust_67890",
"items": [
{"product_id": "prod_001", "quantity": 2, "unit_price": "29.99"}
],
"total_amount": "59.98"
}
]
}
5. Add Titles and Descriptions
FastStream pulls titles and descriptions from your decorators:
@broker.subscriber(
"orders.created",
group_id="order-fulfillment-service",
title="Order Created Subscriber", # → operation title
description="Consumes new order events and initiates fulfillment", # → operation description
)
async def handle_order_created(body: str):
...
6. Automate Validation in CI/CD
# GitHub Actions example
- name: Generate and validate AsyncAPI
run: |
cd producer && uv run faststream docs gen app.main:app --yaml
asyncapi validate asyncapi.yaml
Comparing AsyncAPI to Alternatives
| Approach | Pros | Cons |
|---|---|---|
| AsyncAPI | Standard spec, rich tooling, machine-readable | Learning curve, maintenance overhead |
| Confluence/Wiki | Easy to start, familiar | Goes stale, not machine-readable |
| Schema Registry only | Enforces schemas | Doesn’t capture operations/ownership |
| Code comments | Close to implementation | Scattered, hard to aggregate |
AsyncAPI works best when combined with Schema Registry—schemas in the registry, API contracts in AsyncAPI.
What’s Next
AsyncAPI is becoming the standard for event-driven documentation. Recent developments include:
- AsyncAPI 3.0: Clearer operation semantics, better request-reply support
- CloudEvents integration: Standardized event envelope format
- Protocol bindings: Better support for Kafka, AMQP, MQTT specifics
- AI/LLM tooling: Generate specs from code, code from specs
If you’re building event-driven systems, invest in AsyncAPI. The upfront documentation work pays dividends in discoverability, onboarding, and preventing integration bugs.
References
- AsyncAPI Specification
- AsyncAPI Tools
- FastStream Documentation
- Redpanda Connect
- EventCatalog
- Demo Repository
The complete demo code is available in the asyncapi-demo directory of this repository.