After my last post, I got to thinking that now is probably a good time to take a step back and talk about Message Translation, how Content Enricher and Content Filter fit in, as well as some thoughts around Canonical Data Model with examples from my work experience.

When systems talk to each other, they rarely agree on data formats. Your e-commerce platform calls it a “customer,” your CRM calls it a “contact,” and your billing system calls it an “account holder.” They all mean the same thing, but their schemas don’t match. Multiply this across dozens of integrations and you’ve got a translation nightmare.

Today’s patterns provide the solution: a Canonical Data Model that serves as a shared vocabulary, plus transformation patterns that convert messages to and from that common language.

The Integration Spaghetti Problem

I’m going to use a property data aggregation platform as our running example—think of systems that consolidate real estate information from multiple sources to provide a unified view. I’ve built systems like this, and the pattern applies broadly to any domain where you’re aggregating data from diverse external sources.

Data flows in from county records offices, title companies, inspection services, and MLS feeds. Most of these sources land via File Transfer—the pattern we covered on Day 1. County assessors aren’t calling your REST API; they’re dropping CSV files on an SFTP server overnight. Title companies send batched XML. The file transfer pattern gets data into your system, but then you need to transform it into something useful.

A modern implementation might look like this: SFTP gateway (AWS Transfer Family or similar) in front of S3 buckets. Partners upload to what looks like a traditional SFTP server, but files land in s3://property-data-inbox/county/, s3://property-data-inbox/title/, etc. S3 event notifications fire when files arrive, publishing a message containing the bucket and key—a Claim Check. The actual 50MB CSV doesn’t travel through your message broker; just a reference to where it lives. A worker picks up that claim check, pulls the file from S3, iterates through the rows, and emits individual PropertyEvent messages to the canonical stream.

So let’s imagine the inbound data:

  • County assessors drop property records as CSVs on SFTP
  • Title companies send ownership transfers as batched XML files
  • Inspection services submit reports through a REST API
  • MLS feeds deliver listing updates in fixed-width format

Each source has its own schema, its own field names, its own conventions. A property identifier might be parcel_id, APN, property_id, or tax_lot_number. Addresses could be 123 Main St, 123 MAIN STREET, or split across five fields. Dates could be 2025-12-23, 12/23/2025, 23-Dec-2025, or a Unix timestamp.

The Canonical Data Model

The Canonical Data Model pattern cuts through this complexity:

“Design a Canonical Data Model that is independent from any specific application. Require each application to produce and consume messages in this common format.”

Instead of point-to-point translations, every system speaks one common language. Now you need only 2n transformations: one “to canonical” and one “from canonical” for each system.

Key characteristics:

  • Application-independent: The canonical model doesn’t belong to any single system
  • Domain-focused: Models the business concepts, not technical concerns
  • Stable: Changes slowly compared to individual system schemas
  • Complete enough: Covers the union of fields that any consumer might need

The Property Data Canonical Model

For our aggregation system, the canonical model captures what matters across all event types:

interface PropertyEvent {
  eventId: string;
  propertyId: string;         // Normalized parcel/APN
  address: NormalizedAddress; // Standardized format
  occurredAt: Date;           // Always UTC
  eventType: 'assessment' | 'transfer' | 'listing' | 'inspection' | 'permit';
  source: { system: string; originalId: string; };
  details: Assessment | Transfer | Listing | InspectionReport;
}

interface Transfer {
  type: 'transfer';
  grantor?: PartyInfo;
  grantee: PartyInfo;
  salePrice?: Money;
  recordedAt: Date;
  documentNumber: string;
}

Every incoming message—regardless of source format—gets translated into this shape. Every downstream consumer reads from this same shape.

When Canonical Models Emerge Organically

Canonical data models rarely spring from elegant upfront design—they emerge from necessity.

I was working on a platform where four separate product verticals had been evolving independently for years. Each had its own codebase, database, and event system. Then came the mandate: integrate everything into a single management plane. Users needed to interact with resources from all four products seamlessly—organizing them into collections, trashing items, restoring from trash.

The challenge? Each system had a different relationship with events:

  • Two systems emitted CRUD events but with wildly different schemas
  • One had a partial soft-delete implementation with its own event vocabulary
  • The fourth had “emit events” somewhere on next quarter’s roadmap

The solution was a canonical ResourceLifecycle event schema—a shared vocabulary that all four domains could speak. Some systems adopted it natively; others got a message translation layer to convert their existing events.

What made this interesting is how the three transformation patterns naturally emerged:

  • Content Enricher: Some events lacked required fields like workspace_id, so the translation layer called APIs to look up the missing data before producing the canonical event.
  • Content Filter: Legacy events carried internal fields (audit metadata, system flags) that downstream consumers didn’t need. These were stripped at the boundary.
  • Message Translator: One system’s chatbot_enabled event mapped to our canonical resource_enabled—pure structural transformation.

The canonical model didn’t just enable a unified user experience—it unlocked future capabilities. New features could subscribe to ResourceLifecycle events without caring which product originated them.

Three Transformation Patterns

The Canonical Data Model is an architectural principle. To implement it, you need three transformation patterns:

PatternPurposeWhen to Use
Message TranslatorConvert between formatsSource → Canonical structure mapping
Content EnricherAdd missing dataLookup external data to complete message
Content FilterRemove unnecessary dataStrip fields for specific consumers


Message Translator

The Message Translator is the workhorse of data transformation:

“Use a special filter, a Message Translator, between other filters or applications to translate one data format into another.”

A translator’s job is pure: take a message in format A, output a message in format B. No side effects, no external calls, just structure mapping.

County Records CSV to Canonical (Python)

The county sends property assessments as CSV with their own conventions:

# Input: PARCEL_NUM,ASSESS_DATE,OWNER_NAME,SITE_ADDR,ASSESSED_VAL,LAND_VAL

class CountyAssessorTranslator:
    def translate(self, row: dict) -> PropertyEvent:
        return PropertyEvent(
            event_id=str(uuid.uuid4()),
            property_id=self._normalize_parcel(row['PARCEL_NUM']),
            address=self._normalize_address(row['SITE_ADDR']),
            occurred_at=datetime.strptime(row['ASSESS_DATE'], '%m/%d/%Y'),
            event_type='assessment',
            source={'system': 'county', 'original_id': row['PARCEL_NUM']},
            details=Assessment(
                owner_name=row['OWNER_NAME'],
                assessed_value=int(row['ASSESSED_VAL']),
                land_value=int(row['LAND_VAL']),
            )
        )

The translator handles all the county quirks: MM/DD/YYYY dates, PARCEL_NUM field names, address normalization. Downstream consumers only see the canonical shape.

Title Company XML to Canonical

Title companies send ownership transfers as XML. Same pattern, different parser:

class TitleTransferTranslator:
    def translate(self, xml: str) -> PropertyEvent:
        root = ET.fromstring(xml)
        return PropertyEvent(
            event_id=str(uuid.uuid4()),
            property_id=self._normalize_parcel(root.findtext('Property/APN')),
            address=self._parse_address(root.find('Property/Address')),
            occurred_at=datetime.fromisoformat(root.findtext('RecordedDate')),
            event_type='transfer',
            source={'system': 'title', 'original_id': root.findtext('DocumentNumber')},
            details=Transfer(
                grantor=root.findtext('Grantor/Name'),
                grantee=root.findtext('Grantee/Name'),
                sale_price=int(float(root.findtext('SalePrice') or 0) * 100),
                document_number=root.findtext('DocumentNumber'),
            )
        )

Two completely different source formats, one canonical output.


Content Enricher

The Content Enricher pattern augments a message with data from external sources:

“Use a specialized transformer, a Content Enricher, to access an external data source in order to augment a message with missing information.”

Sometimes the incoming message doesn’t have everything downstream consumers need. An address isn’t coordinates, but you can geocode it. A parcel number from one county might need cross-referencing to your canonical ID. The enricher fills in these gaps.

Address to Coordinates Lookup

Inspection services might submit reports with just an address. We enrich with geocoded coordinates and our canonical property ID:

class InspectionEnricher:
    def __init__(self, geocoder: GeocodingService, property_lookup: PropertyLookup):
        self.geocoder = geocoder
        self.property_lookup = property_lookup
    
    async def enrich(self, report: InspectionReport) -> PropertyEvent:
        coords = await self.geocoder.geocode(report.address)
        property_id = await self.property_lookup.find_by_address(report.address)
        
        if not property_id:
            raise EnrichmentError(f"No property found for {report.address}")
        
        return PropertyEvent(
            event_id=str(uuid.uuid4()),
            property_id=property_id,  # Enriched!
            address=report.address,
            coordinates=coords,       # Enriched!
            occurred_at=report.inspection_date,
            event_type='inspection',
            source={'system': 'inspector', 'original_id': report.report_id},
            details=report.findings,
        )

Notice we’re calling two external services—the geocoder and our property lookup. The enricher is where that complexity lives, keeping the downstream consumers simple.

Enrichment Strategies

StrategyWhenTrade-offs
SynchronousDuring message processingAdds latency, always fresh
Async pre-fetchBefore message hits queueBetter throughput, may be stale
On-demandWhen consumer needs itMinimal storage, complex consumers

For geocoding, synchronous makes sense: we need coordinates before storing, and addresses don’t move.


Content Filter

The Content Filter is the opposite of enrichment—it removes data:

“Use a Content Filter to remove unimportant data items from a message, leaving only the important items.”

Why filter content?

  • Privacy: Remove PII before sending to analytics
  • Efficiency: Drop large fields that consumers don’t need
  • Security: Prevent sensitive data from crossing domain boundaries
  • Compliance: Ensure certain fields never leave a region

PII Removal for Analytics (TypeScript)

The analytics team wants property events but shouldn’t see owner names or SSNs:

interface AnalyticsEvent {
  eventId: string;
  propertyId: string;
  occurredAt: Date;
  eventType: string;
  sourceSystem: string;
  salePrice?: number;
  assessedValue?: number;
  // No owner info, no addresses, no SSNs
}

class AnalyticsFilter {
  filter(event: PropertyEvent): AnalyticsEvent {
    return {
      eventId: event.eventId,
      propertyId: event.propertyId,
      occurredAt: event.occurredAt,
      eventType: event.eventType,
      sourceSystem: event.source.system,
      salePrice: event.details.salePrice,
      assessedValue: event.details.assessedValue,
    };
  }
}

The filter is explicit about what it keeps. This is safer than enumerating what to remove—new PII fields won’t accidentally leak through.


Putting It Together: The Property Event Pipeline

Here’s the complete flow:

class PropertyEventPipeline:
    def __init__(self):
        self.translators = {
            'county': CountyAssessorTranslator(),
            'title': TitleTransferTranslator(),
            'mls': MLSListingTranslator(),
        }
        self.enricher = InspectionEnricher(GeocodingService(), PropertyLookup())
        self.analytics_filter = AnalyticsFilter()
        self.partner_filter = PartnerFilter()
    
    async def process(self, source: str, raw: Any) -> None:
        # Translate or enrich to canonical
        if source == 'inspection':
            canonical = await self.enricher.enrich(raw)
        else:
            canonical = self.translators[source].translate(raw)
        
        # Fan out with appropriate filtering
        await asyncio.gather(
            self.store_primary(canonical),
            self.send_analytics(self.analytics_filter.filter(canonical)),
            self.send_partner(self.partner_filter.filter(canonical)),
        )

Adding a new source? Write one translator. Adding a new consumer? Write one filter. The canonical model in the middle keeps things from exploding.


Signs You Need a Canonical Model

You rarely design a canonical model upfront. They emerge from pain:

  1. Point-to-Point: You integrate county records with your database. One translation, no big deal.

  2. Growing Complexity: You add title data. Now you have County → DB, Title → DB, County → Analytics. Each slightly different.

  3. Breaking Point: You add MLS feeds. And inspection data. And a partner API. Each new system multiplies the translation burden. Someone spends a sprint fixing a bug that existed in three different translators.

  4. Canonical Model: Someone says, “What if we defined a standard format?” You build it, migrate the translators, and suddenly adding new systems is easy again.

Warning signs:

  • Multiple teams writing similar translation code
  • The same bug appearing in different integrations
  • Adding a new source requires touching many systems
  • Different consumers have inconsistent views of the same entity

What’s Next

We’ve covered how to reshape data as it flows through your system. But so far, our messages have been one-way: fire and forget. What happens when you need a response?

Tomorrow we’ll explore Request-Reply & Correlation patterns:

  • Correlation Identifier — Matching responses to their originating requests
  • Return Address — Telling services where to send replies
  • Request-Reply — Synchronous vs asynchronous approaches
  • Message Expiration — Handling timeouts gracefully

These patterns are the foundation for multi-step workflows and distributed conversations.

See you on Day 7! 🎄


This post is part of the Advent of Enterprise Integration Patterns series. Check out the introduction or follow along with the enterprise-integration-patterns tag. All patterns referenced are from the classic book Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf.


A note on AI usage: I used Claude as a writing assistant for this series, particularly for generating code samples that illustrate the patterns. The patterns, architectural insights, and real-world experiences are mine—the AI helped me get the examples onto the page faster. I believe in transparency about these tools.

comments powered by Disqus