If you’ve worked with REST APIs, you probably know OpenAPI (formerly Swagger). It’s the standard for documenting HTTP APIs—describing endpoints, request/response schemas, and authentication. Tools generate documentation, client libraries, and mock servers from OpenAPI specs.

But what about event-driven systems? When you have services communicating through Kafka, RabbitMQ, or SQS, how do you document:

  • Which topics/queues exist?
  • What events flow through them?
  • Who produces and consumes each event?
  • What’s the schema of each message?

Enter AsyncAPI.

What is AsyncAPI?

AsyncAPI is a specification for documenting asynchronous, event-driven APIs. Think of it as OpenAPI for message brokers. It provides a machine-readable format to describe:

  • Channels: Topics, queues, or exchanges where messages flow
  • Messages: The events themselves, including schemas
  • Operations: Whether a service publishes to or subscribes from a channel
  • Servers: Broker connection details (Kafka, RabbitMQ, MQTT, etc.)

The specification launched in 2017 and has matured significantly. Version 3.0 (released in late 2023) introduced clearer semantics around operations and better support for request-reply patterns.

Why Does This Matter?

In a microservices architecture, event-driven communication creates an “invisible” layer of integration. Unlike REST endpoints that you can discover through API gateways, message flows are hidden inside broker configurations and consumer group assignments.

This leads to common problems:

  1. Tribal knowledge: “Oh, the order service publishes to orders.created? Only Sarah knows that.”
  2. Schema drift: Producer changes the event format, consumers break silently
  3. Orphaned topics: Topics that nothing reads from anymore
  4. Duplicate events: Two services publishing similar events because they didn’t know the other existed

AsyncAPI addresses these by creating a single source of truth for your event-driven contracts.

Anatomy of an AsyncAPI Spec

Let’s look at a real example. Here’s a simplified spec for an Order Producer service:

asyncapi: 3.0.0

info:
  title: Order Producer Service
  version: 1.0.0
  description: |
    Service that generates order events and publishes them to Kafka.    

servers:
  production:
    host: kafka.prod.example.com:9092
    protocol: kafka
    description: Production Kafka cluster

channels:
  ordersCreated:
    address: orders.created
    description: Channel for new order events
    messages:
      orderCreated:
        $ref: '#/components/messages/OrderCreated'

operations:
  publishOrderCreated:
    action: send
    channel:
      $ref: '#/channels/ordersCreated'
    summary: Publish a new order event
    messages:
      - $ref: '#/channels/ordersCreated/messages/orderCreated'

components:
  messages:
    OrderCreated:
      name: OrderCreated
      title: Order Created Event
      summary: Event emitted when a new order is placed
      contentType: application/json
      payload:
        type: object
        required:
          - order_id
          - customer_id
          - items
          - total_amount
        properties:
          order_id:
            type: string
            description: Unique order identifier
          customer_id:
            type: string
            description: Customer who placed the order
          items:
            type: array
            items:
              type: object
              properties:
                product_id:
                  type: string
                quantity:
                  type: integer
                unit_price:
                  type: string
          total_amount:
            type: string
            description: Total as decimal string
          created_at:
            type: string
            format: date-time

Key Sections

info: Metadata about the API—title, version, description. This appears in generated documentation.

servers: Connection details for your message brokers. You can define multiple environments (dev, staging, prod).

channels: The topics or queues. Each channel has an address (the actual topic name) and defines which messages flow through it.

operations: What your service does. The action field is either send (publish) or receive (subscribe).

components: Reusable definitions for messages, schemas, and other elements.

A Complete Example: Order Fulfillment System

Let’s build something real. I’ve created a demo application with two FastStream services, Redpanda (Kafka-compatible), and Redpanda Connect bridging to SQS.

Architecture Overview

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────────┐
│  Order Producer │────▶│  orders.created  │────▶│  Order Fulfillment  │
│   (FastStream)  │     │     (Kafka)      │     │    (FastStream)     │
└─────────────────┘     └──────────────────┘     └──────────┬──────────┘
                        ┌──────────────────┐               │
                        │ orders.accepted  │◀──────────────┤
                        ├──────────────────┤               │
                        │ orders.shipped   │◀──────────────┤
                        ├──────────────────┤               │
                        │ orders.fulfilled │◀──────────────┘
                        └────────┬─────────┘
                        ┌────────▼─────────┐     ┌─────────────────────┐
                        │ Redpanda Connect │────▶│    AWS SQS          │
                        └──────────────────┘     │  (LocalStack)       │
                                                 └─────────────────────┘

The Order Producer generates random orders every few seconds. The Order Fulfillment service consumes these orders and publishes lifecycle events as orders progress through validation, shipping, and completion. Redpanda Connect bridges these events to SQS for downstream consumers.

The Producer Service

Using FastStream, a Python framework for async messaging, the producer is straightforward. FastStream 0.6+ uses a separate AsyncAPI specification object for metadata:

from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.specification import AsyncAPI

broker = KafkaBroker("localhost:19092")

# AsyncAPI metadata is defined separately
spec = AsyncAPI(
    title="Order Producer Service",
    version="1.0.0",
    description="Generates order events",
)

app = FastStream(broker, specification=spec)

# Type the publisher with a Pydantic model for schema generation
publisher = broker.publisher(
    "orders.created",
    title="Order Created Publisher",
    description="Publishes new order events",
    schema=OrderCreated,  # This drives AsyncAPI schema generation
)

async def publish_order(order: OrderCreated):
    await publisher.publish(
        order.model_dump_json(),
        key=order.customer_id.encode(),
        headers={
            "event_type": "OrderCreated",
            "correlation_id": order.order_id,
        },
    )

FastStream uses Pydantic models for serialization and can auto-generate AsyncAPI specs from your code. That’s powerful—your documentation stays in sync with your implementation.

The Consumer Service

The fulfillment service subscribes to orders and publishes downstream events. Notice how publishers are typed with their schema for AsyncAPI generation:

from faststream.specification import AsyncAPI

spec = AsyncAPI(
    title="Order Fulfillment Service",
    version="1.0.0",
    description="Processes orders through the fulfillment pipeline.",
)

app = FastStream(broker, specification=spec)

# Publishers with typed schemas
accepted_publisher = broker.publisher(
    "orders.accepted",
    title="Order Accepted Publisher",
    schema=OrderAccepted,
)
shipped_publisher = broker.publisher(
    "orders.shipped",
    title="Order Shipped Publisher",
    schema=OrderShipped,
)
fulfilled_publisher = broker.publisher(
    "orders.fulfilled",
    title="Order Fulfilled Publisher",
    schema=OrderFulfilled,
)

@broker.subscriber(
    "orders.created",
    group_id="order-fulfillment-service",
    title="Order Created Subscriber",
    description="Consumes new order events and initiates fulfillment",
)
async def handle_order_created(body: str, msg: KafkaMessage):
    order = OrderCreated.model_validate_json(body)
    
    # Validate and accept
    accepted = OrderAccepted(
        order_id=order.order_id,
        customer_id=order.customer_id,
        estimated_ship_date=calculate_ship_date(),
        warehouse_id=assign_warehouse(),
        correlation_id=order.order_id,
    )
    await accepted_publisher.publish(accepted.model_dump_json())
    
    # Ship the order
    shipped = OrderShipped(
        order_id=order.order_id,
        tracking_number=generate_tracking(),
        carrier=select_carrier(),
        # ...
    )
    await shipped_publisher.publish(shipped.model_dump_json())
    
    # Mark fulfilled
    fulfilled = OrderFulfilled(...)
    await fulfilled_publisher.publish(fulfilled.model_dump_json())

The AsyncAPI Spec for the Consumer

Here’s where AsyncAPI shines. The consumer service both receives AND sends events. The spec documents both:

asyncapi: 3.0.0

info:
  title: Order Fulfillment Service
  version: 1.0.0
  description: |
    Processes orders through the fulfillment pipeline.
    
    ### Events Consumed
    - **OrderCreated**: Triggers fulfillment
    
    ### Events Published  
    - **OrderAccepted**: Order validated
    - **OrderShipped**: Order dispatched
    - **OrderFulfilled**: Process complete    

channels:
  ordersCreated:
    address: orders.created
    messages:
      orderCreated:
        $ref: '#/components/messages/OrderCreated'

  ordersAccepted:
    address: orders.accepted
    messages:
      orderAccepted:
        $ref: '#/components/messages/OrderAccepted'

  ordersShipped:
    address: orders.shipped
    messages:
      orderShipped:
        $ref: '#/components/messages/OrderShipped'

  ordersFulfilled:
    address: orders.fulfilled
    messages:
      orderFulfilled:
        $ref: '#/components/messages/OrderFulfilled'

operations:
  consumeOrderCreated:
    action: receive
    channel:
      $ref: '#/channels/ordersCreated'
    summary: Consume new orders for fulfillment
    
  publishOrderAccepted:
    action: send
    channel:
      $ref: '#/channels/ordersAccepted'
    summary: Publish order acceptance events
    
  publishOrderShipped:
    action: send
    channel:
      $ref: '#/channels/ordersShipped'
    summary: Publish shipping events
    
  publishOrderFulfilled:
    action: send
    channel:
      $ref: '#/channels/ordersFulfilled'
    summary: Publish fulfillment completion

Now anyone can look at this spec and understand:

  • What this service does
  • What events it consumes
  • What events it produces
  • The schema of each event

Bridging with Redpanda Connect

Redpanda Connect (formerly Benthos) bridges our Kafka events to SQS. Its configuration is YAML-based:

input:
  kafka_franz:
    seed_brokers:
      - redpanda:9092
    topics:
      - orders.accepted
      - orders.shipped
      - orders.fulfilled
    consumer_group: redpanda-connect-sqs-bridge

pipeline:
  processors:
    - mapping: |
        root = this.parse_json()
        root.bridge_metadata = {
          "source_topic": @kafka_topic,
          "bridged_at": now()
        }        

output:
  switch:
    cases:
      - check: meta("event_type") == "OrderFulfilled"
        output:
          aws_sqs:
            url: http://localstack:4566/000000000000/order-fulfillment-events
            
      - output:
          aws_sqs:
            url: http://localstack:4566/000000000000/order-notifications

This routes OrderFulfilled events to one queue and everything else to a notifications queue. The bridge itself could have an AsyncAPI spec documenting what it consumes and where it routes messages.

Schema Registry Integration

The demo uses Avro schemas registered with Redpanda’s built-in Schema Registry. This provides:

  1. Schema evolution: Add fields without breaking consumers
  2. Compatibility checks: Registry rejects incompatible changes
  3. Efficient serialization: Binary format smaller than JSON

Here’s the Avro schema for OrderCreated:

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders.events",
  "doc": "Event emitted when a new order is created",
  "fields": [
    {
      "name": "order_id",
      "type": "string",
      "doc": "Unique identifier for the order"
    },
    {
      "name": "customer_id",
      "type": "string"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "product_id", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "unit_price", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}}
          ]
        }
      }
    },
    {
      "name": "total_amount",
      "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
    },
    {
      "name": "created_at",
      "type": {"type": "long", "logicalType": "timestamp-millis"}
    }
  ]
}

AsyncAPI can reference Schema Registry subjects directly, keeping your API docs in sync with registered schemas.

Generating Specs from Code

Here’s where FastStream really shines: you can generate AsyncAPI specs directly from your code. No manual YAML writing required.

FastStream CLI

FastStream includes a built-in CLI that inspects your decorated publishers, subscribers, and Pydantic models to generate specs:

# Generate YAML spec from your FastStream app
cd producer
uv run faststream docs gen app.main:app --yaml

# This creates asyncapi.yaml with:
# - All publishers and subscribers as channels/operations
# - Pydantic models converted to JSON Schema
# - Examples from model's json_schema_extra
# - Descriptions from docstrings

The generated spec includes everything FastStream knows about your app:

# Auto-generated from code!
info:
  title: Order Producer Service
  version: 1.0.0
  description: "## Order Producer Service\n\nThis service generates order events..."
asyncapi: 3.0.0
channels:
  Order Created Publisher:
    address: Order Created Publisher
    description: Publishes new order events to the orders.created topic
    bindings:
      kafka:
        topic: orders.created
operations:
  Order Created Publisher:
    action: send
    channel:
      $ref: '#/channels/Order Created Publisher'
components:
  schemas:
    OrderCreated:
      description: 'Event published when a new order is created...'
      properties:
        order_id:
          type: string
        customer_id:
          type: string
        # ... all fields from your Pydantic model
      examples:
        - order_id: ord_12345
          customer_id: cust_67890
          # ... from json_schema_extra

Serve Interactive Docs

FastStream can also serve an interactive documentation viewer:

# Start the docs server
cd producer
uv run faststream docs serve app.main:app

# Opens browser with interactive spec viewer

AsyncAPI CLI Tools

The AsyncAPI ecosystem provides additional tooling:

# Install the AsyncAPI CLI
npm install -g @asyncapi/cli

# Validate specs
asyncapi validate docs/asyncapi-producer.yaml

# Generate HTML documentation (Swagger UI style)
asyncapi generate fromTemplate docs/asyncapi-producer.yaml @asyncapi/html-template -o docs/html

# Start interactive studio
asyncapi start studio docs/asyncapi-producer.yaml

AsyncAPI Studio

The AsyncAPI Studio is a web-based editor with:

  • Live preview of documentation
  • Schema validation
  • Visual channel/operation mapping

Code Generation from Specs

Generate code from specs (the reverse direction):

# Generate TypeScript types
asyncapi generate fromTemplate spec.yaml @asyncapi/typescript-nats-template

# Generate Python models
asyncapi generate fromTemplate spec.yaml @asyncapi/python-paho-template

This creates a static site with interactive schema exploration, similar to Swagger UI for REST APIs.

Discoverability and Governance

AsyncAPI becomes truly powerful when combined with a central registry. Imagine:

  1. Event Catalog: All services register their AsyncAPI specs
  2. Search: “Show me all services that consume orders.created
  3. Impact Analysis: “If I change this schema, who breaks?”
  4. Lineage: Visual graphs of event flows across services

Tools like EventCatalog and Backstage can aggregate AsyncAPI specs to provide this visibility.

Example: Tracking Producers and Consumers

From our two AsyncAPI specs, we can extract:

ChannelProducersConsumers
orders.createdOrder ProducerOrder Fulfillment
orders.acceptedOrder FulfillmentRedpanda Connect
orders.shippedOrder FulfillmentRedpanda Connect
orders.fulfilledOrder FulfillmentRedpanda Connect

This mapping answers questions like:

  • “Who do I contact about changes to orders.created?” → Order Producer team
  • “What breaks if Order Fulfillment goes down?” → Nothing receives fulfillment events
  • “Is anyone still consuming orders.legacy?” → Check the registry

AsyncAPI and the Transactional Outbox Pattern

In the demo above, services publish directly to Kafka. But what happens if Kafka is down? Your order gets saved to the database, but the event never publishes. You end up with inconsistent state.

The Transactional Outbox pattern solves this by writing events to a database table in the same transaction as your business data, then having a separate relay publish to Kafka. This guarantees delivery without distributed transactions.

When using the outbox pattern with AsyncAPI, you may have two APIs to document:

  1. OpenAPI: The HTTP API that receives events (if using an API-based outbox)
  2. AsyncAPI: The Kafka topics where events eventually land

The key is cross-referencing between specs. In your AsyncAPI description, note that events flow through an outbox:

info:
  description: |
    Events are delivered via the Transactional Outbox pattern with
    at-least-once semantics. Consumers should handle duplicates.
    
    See: [Events Ingestion API](/docs/openapi/events-api)    

For a complete deep dive into implementing the outbox pattern, see my dedicated post: The Transactional Outbox Pattern: Reliable Event Publishing.

Running the Demo

The complete demo is available in the repository. To run it:

cd asyncapi-demo

# Start all services
make up

# View Redpanda Console at http://localhost:8080
# Watch logs
make logs

# Check SQS messages
make sqs-messages

# Stop everything
make down

You’ll see:

  1. Orders generated every 5 seconds
  2. Each order flows through: Created → Accepted → Shipped → Fulfilled
  3. Events bridged to SQS queues

Generate Specs from Running Code

# Generate AsyncAPI specs from your FastStream services
make generate-specs

# This runs:
#   cd producer && uv run faststream docs gen app.main:app --yaml
#   cd consumer && uv run faststream docs gen app.main:app --yaml

# Serve interactive docs
make serve-producer-docs
make serve-consumer-docs

Best Practices

From building this demo, some recommendations:

1. Generate from Code, Don’t Hand-Write

With FastStream, you can generate specs directly from your decorated publishers and subscribers. This keeps documentation in sync with implementation:

# Add schema= to publishers for automatic spec generation
publisher = broker.publisher(
    "orders.created",
    title="Order Created Publisher",
    description="Publishes new order events",
    schema=OrderCreated,  # Pydantic model → JSON Schema
)

Then generate:

faststream docs gen app.main:app --yaml

2. Spec Per Service

Each service owns its AsyncAPI spec. The producer documents what it publishes; the consumer documents what it subscribes to AND publishes.

3. Version Your Specs

Use semantic versioning. Breaking changes (removing fields, changing types) bump the major version.

4. Include Examples

Add examples to your Pydantic models—FastStream includes them in the generated spec:

class OrderCreated(BaseModel):
    order_id: str
    customer_id: str
    # ...
    
    class Config:
        json_schema_extra = {
            "examples": [
                {
                    "order_id": "ord_abc123",
                    "customer_id": "cust_67890",
                    "items": [
                        {"product_id": "prod_001", "quantity": 2, "unit_price": "29.99"}
                    ],
                    "total_amount": "59.98"
                }
            ]
        }

5. Add Titles and Descriptions

FastStream pulls titles and descriptions from your decorators:

@broker.subscriber(
    "orders.created",
    group_id="order-fulfillment-service",
    title="Order Created Subscriber",  # → operation title
    description="Consumes new order events and initiates fulfillment",  # → operation description
)
async def handle_order_created(body: str):
    ...

6. Automate Validation in CI/CD

# GitHub Actions example
- name: Generate and validate AsyncAPI
  run: |
    cd producer && uv run faststream docs gen app.main:app --yaml
    asyncapi validate asyncapi.yaml    

Comparing AsyncAPI to Alternatives

ApproachProsCons
AsyncAPIStandard spec, rich tooling, machine-readableLearning curve, maintenance overhead
Confluence/WikiEasy to start, familiarGoes stale, not machine-readable
Schema Registry onlyEnforces schemasDoesn’t capture operations/ownership
Code commentsClose to implementationScattered, hard to aggregate

AsyncAPI works best when combined with Schema Registry—schemas in the registry, API contracts in AsyncAPI.

What’s Next

AsyncAPI is becoming the standard for event-driven documentation. Recent developments include:

  • AsyncAPI 3.0: Clearer operation semantics, better request-reply support
  • CloudEvents integration: Standardized event envelope format
  • Protocol bindings: Better support for Kafka, AMQP, MQTT specifics
  • AI/LLM tooling: Generate specs from code, code from specs

If you’re building event-driven systems, invest in AsyncAPI. The upfront documentation work pays dividends in discoverability, onboarding, and preventing integration bugs.

References


The complete demo code is available in the asyncapi-demo directory of this repository.