目次

EventBridge Schemas 完全ガイド v2.0

イベント駆動アーキテクチャのスキーマ管理・型安全開発プラットフォーム


概要

Amazon EventBridge Schema Registry は 「EventBridge を流れるイベントのスキーマを自動検出・登録・バージョン管理し、言語別の型安全なコードバインディングを自動生成するサービス」 である。イベント駆動アーキテクチャにおけるプロデューサーとコンシューマー間のイベント構造契約(スキーマコントラクト) を一元管理し、API Gateway・gRPC のような Specification 駆動開発を EventBridge 環境で実現する。

本質的な価値:

  1. イベント構造の暗黙的知識をコード化: マイクロサービス間で「どんなフィールドを含むイベント」かを明示化し、破壊的変更を事前に検知
  2. 型安全な開発体験: Java・Python・TypeScript・Go のコードバインディングを自動生成、IDE 補完が効いた開発
  3. AWS サービスイベントの参照: S3・EC2・ECS・Lambda が発行するイベント構造をスキーマから即座に参照
  4. スキーマの自動検出: 実際のイベントバスからイベントをサンプリング → JSON Schema を自動推論 → スキーマレジストリに登録
  5. チーム間のコミュニケーション: スキーマバージョン変更時に「破壊的変更か後方互換か」を可視化

課題と特徴

EventBridge Schemas が解決する課題

❌ 課題:イベント駆動システムが複雑・エラーが多い
  → プロデューサー(注文サービス)が「orderId: string」をイベントに含めた
  → コンシューマー(在庫サービス)が「orderId: number」と仮定して処理
  → 本番環境でデータ型ミスマッチエラー発生
  → 問題の原因追跡に数日かかる
  
  → 新チームメンバーが「このイベントはどんな構造?」を聞く度に
    Slack / ドキュメントで説明必要
  → ドキュメントが古くなってさらに混乱

✅ 解決:EventBridge Schemas で型安全・スキーマ駆動開発
  → スキーマから Java / Python / TypeScript / Go コード自動生成
  → IDE の自動補完でイベント構造を即座に確認・アクセス
  → スキーマバージョン管理で「v1 → v2」の破壊的変更を事前検知
  → チーム全体で「Single Source of Truth(SSOT)」としてスキーマを共有
  → マイクロサービス間の契約違反を本番前に検出

コアの特徴

特徴 説明
AWS サービスイベントスキーマ S3 / EC2 / ECS / Lambda・30+ AWS サービスのイベント定義を事前提供
カスタムスキーマ 手動で JSON Schema / OpenAPI 3.0 を登録、バージョン管理
スキーマ自動検出 イベントバスを監視、実際のイベントから JSON Schema を推論・自動登録
コードバインディング生成 Java / Python / TypeScript / Go のコードを自動生成
IDE 統合 VS Code / JetBrains 環境でスキーマ参照・コード生成が可能
バージョン管理 スキーマ変更時にバージョンアップ、破壊的変更を検知
Schema Registry API DescribeSchema / GetDiscoveredSchema で プログラムからスキーマ取得
EventBridge Rules との統合 スキーマをルールのターゲットに指定可能
無料スキーマ検出 最初 500 万イベント/月は無料、以降 $0.10/100 万

アーキテクチャ(Mermaid 図 1)

graph TB
    subgraph "Event Sources"
        A["Producer 1<br/>Order Service<br/>PutEvents"]
        B["Producer 2<br/>Payment Service<br/>PutEvents"]
        C["AWS Service<br/>S3 / EC2<br/>native events"]
    end
    
    subgraph "EventBridge Bus"
        D["Custom Event Bus<br/>com.company.orders"]
        E["Default Bus<br/>aws.ec2 events"]
    end
    
    subgraph "Schema Detection & Registry"
        F["Schema Discoverer<br/>Sample Events<br/>JSON Schema Inference"]
        G["Schema Registry<br/>aws.ec2@StartInstances v1<br/>com.company.orders@OrderCreated v2"]
    end
    
    subgraph "Code Generation & IDE Integration"
        H["Code Bindings<br/>Java / Python<br/>TypeScript / Go"]
        I["IDE Plugins<br/>VS Code<br/>JetBrains"]
    end
    
    subgraph "Consumer Development"
        J["Lambda / ECS<br/>Type-safe Event Handler<br/>IDE Autocomplete"]
    end
    
    A --> D
    B --> D
    C --> E
    D --> F
    E --> G
    F --> G
    G --> H
    G --> I
    H --> J
    I --> J

スキーマの種類

1. AWS 管理スキーマ(aws.* namespace)

概要: AWS サービスが発行するイベントのスキーマを AWS が提供

AWS サービスイベントスキーマの例:

aws.ec2:
    StartInstances
    StopInstances
    RebootInstances
    TerminateInstances
    詳細フィールド: instance-id, region, account-id, time

aws.s3:
    ObjectCreated:Put
    ObjectCreated:Post
    ObjectCreated:Copy
    ObjectRemoved:Delete
    詳細フィールド: bucket, key, object-size, etag

aws.ecs:
    ECS Task State Change
    ECS Service Action
    詳細フィールド: task-id, service-name, cluster-arn, status

aws.dynamodb:
    Streams Record(INSERT / MODIFY / REMOVE)
    詳細フィールド: table-name, keys, new-image, old-image

aws.lambda:
    Function Invocation Completed
    詳細フィールド: function-name, request-id, duration, error-message

API: DescribeSchema で取得
aws events describe-schema \
  --registry-name 'aws.events' \
  --schema-name 'aws.ec2@EC2 Instance State-change Notification'

スキーマ登録先: aws.events レジストリに事前登録(削除・変更不可)

2. カスタムスキーマ(手動登録)

概要: 自社アプリケーションが発行するカスタムイベント用スキーマ

カスタムスキーマの例:

スキーマ登録(JSON Schema Draft 4 形式):
{
  "Name": "com.company.orders@OrderCreated",
  "Type": "OpenAPI3",
  "Content": {
    "openapi": "3.0.0",
    "info": {"title": "Order Events", "version": "1.0"},
    "paths": {},
    "components": {
      "schemas": {
        "OrderCreated": {
          "type": "object",
          "properties": {
            "orderId": {"type": "string", "format": "uuid"},
            "customerId": {"type": "string"},
            "amount": {"type": "number", "format": "double"},
            "currency": {"type": "string", "enum": ["USD", "JPY", "EUR"]},
            "items": {
              "type": "array",
              "items": {
                "type": "object",
                "properties": {
                  "sku": {"type": "string"},
                  "quantity": {"type": "integer", "minimum": 1},
                  "price": {"type": "number"}
                },
                "required": ["sku", "quantity", "price"]
              }
            },
            "createdAt": {"type": "string", "format": "date-time"},
            "source": {"type": "string", "enum": ["web", "mobile", "api"]}
          },
          "required": ["orderId", "customerId", "amount", "items", "createdAt"]
        }
      }
    }
  }
}

API: CreateSchema で登録
aws events create-schema \
  --registry-name 'company-registry' \
  --schema-name 'com.company.orders@OrderCreated' \
  --type OPENAPI3 \
  --content file://order-event-schema.json

登録先: company-registry など カスタムレジストリに登録

3. スキーマ自動検出(Schema Discovery)

概要: イベントバスを監視 → 実際のイベントから JSON Schema を自動推論

スキーマ自動検出の流れ:

1. EventBridge コンソール → Custom Event Bus
   → 「Enable schema discovery」をオン

2. アプリケーションが PutEvents でイベント送信:
   {
     "source": "com.myapp.orders",
     "detail-type": "OrderCreated",
     "detail": {
       "orderId": "ORD-001",
       "customerId": "CUST-123",
       "amount": 1500.50,
       "items": [
         {"sku": "SKU-A", "quantity": 2, "price": 750.25}
       ],
       "timestamp": "2026-04-26T10:30:00Z"
     }
   }

3. EventBridge が自動的にイベントをサンプリング
   → JSON Schema を推論
   → Schema Registry に登録:
     レジストリ: discovered-schemas
     スキーマ名: com.myapp.orders@OrderCreated
     バージョン: 1

4. 新しいイベント形式が到着(currency フィールド追加):
   {
     "orderId": "ORD-002",
     "customerId": "CUST-456",
     "amount": 2000,
     "currency": "JPY",  ← 新規フィールド
     "items": [...]
   }

5. EventBridge がスキーマ変更を検知
   → 新バージョン「v2」として登録
   → バージョン差分を表示

コスト:
    最初 500 万イベント/月:無料
    500 万超:$0.10/100 万イベント

使用例: CLI で自動検出を有効化

# カスタムイベントバスでスキーマ検出を有効化
aws events put-event-bus-policy \
  --name custom-event-bus \
  --policy '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": "*",
      "Action": "events:PutEvents",
      "Resource": "arn:aws:events:*:*:event-bus/custom-event-bus"
    }]
  }'

# Schema Discovery をトリガー(イベント送信時に自動実行)
aws events put-events \
  --entries '[{
    "Source": "com.myapp.orders",
    "DetailType": "OrderCreated",
    "Detail": "{\"orderId\":\"ORD-001\",\"customerId\":\"CUST-123\",\"amount\":1500.50}"
  }]'

# 検出されたスキーマを確認
aws events describe-schema \
  --registry-name discovered-schemas \
  --schema-name 'com.myapp.orders@OrderCreated'

コードバインディング生成

生成フロー

Marketplace での流れ:

1. EventBridge コンソール → Schema Registry
2. スキーマを選択(例:com.company.orders@OrderCreated)
3. 「Download code binding」をクリック
4. 言語を選択:Java / Python / TypeScript / Go
5. ZIP ファイルをダウンロード
6. プロジェクトに解凍・統合

各言語の生成コード例

Python(dataclass)

# 自動生成ファイル: generated/order_event.py
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime

@dataclass
class OrderItem:
    sku: str
    quantity: int
    price: float

@dataclass
class OrderCreated:
    orderId: str
    customerId: str
    amount: float
    currency: str
    items: List[OrderItem]
    createdAt: str
    source: str

# Lambda ハンドラー(型安全)
from aws_lambda_powertools import Logger
from generated.order_event import OrderCreated, Marshaller

logger = Logger()

def lambda_handler(event, context):
    # イベント → OrderCreated オブジェクトに自動変換
    order = Marshaller.unmarshal_event(event, OrderCreated)
    
    # IDE 補完が効く!
    logger.info(f"Order ID: {order.orderId}")
    logger.info(f"Customer: {order.customerId}")
    logger.info(f"Amount: {order.amount} {order.currency}")
    
    # 個別アイテムにアクセス
    for item in order.items:
        logger.info(f"  SKU: {item.sku}, Qty: {item.quantity}, Price: {item.price}")
    
    return {"statusCode": 200, "message": f"Order {order.orderId} processed"}

Java(POJO + Jackson)

// 自動生成ファイル: OrderCreated.java
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.OffsetDateTime;
import java.util.List;

public class OrderCreated {
    @JsonProperty("orderId")
    private String orderId;
    
    @JsonProperty("customerId")
    private String customerId;
    
    @JsonProperty("amount")
    private Double amount;
    
    @JsonProperty("currency")
    private String currency;
    
    @JsonProperty("items")
    private List<OrderItem> items;
    
    @JsonProperty("createdAt")
    private String createdAt;
    
    @JsonProperty("source")
    private String source;
    
    // Getter / Setter 自動生成
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }
    // ...
}

// Lambda ハンドラー(型安全)
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrderEventHandler implements RequestHandler<Map<String, Object>, String> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public String handleRequest(Map<String, Object> event, Context context) {
        try {
            // イベント → OrderCreated オブジェクトに自動変換
            OrderCreated order = objectMapper.convertValue(
                event.get("detail"), 
                OrderCreated.class
            );
            
            // IDE 補完が効く!
            context.getLogger().log("Order ID: " + order.getOrderId());
            context.getLogger().log("Amount: " + order.getAmount() + " " + order.getCurrency());
            
            // 個別アイテムにアクセス
            for (OrderItem item : order.getItems()) {
                context.getLogger().log("  SKU: " + item.getSku() + 
                    ", Qty: " + item.getQuantity());
            }
            
            return "OK";
        } catch (Exception e) {
            context.getLogger().log("Error: " + e.getMessage());
            return "ERROR";
        }
    }
}

TypeScript(interface)

// 自動生成ファイル: order-event.ts
export interface OrderItem {
  sku: string;
  quantity: number;
  price: number;
}

export interface OrderCreated {
  orderId: string;
  customerId: string;
  amount: number;
  currency: string;
  items: OrderItem[];
  createdAt: string;
  source: string;
}

// Lambda ハンドラー(型安全)
import { CloudWatchLogsEvent } from 'aws-lambda';
import { OrderCreated } from './order-event';

export async function handler(event: any): Promise<string> {
  // イベント → OrderCreated に自動キャスト
  const order: OrderCreated = event.detail;
  
  // IDE の autocomplete が効く!
  console.log(`Order ID: ${order.orderId}`);
  console.log(`Customer: ${order.customerId}`);
  console.log(`Amount: ${order.amount} ${order.currency}`);
  
  // 個別アイテムにアクセス(型チェック)
  for (const item of order.items) {
    console.log(`  SKU: ${item.sku}, Qty: ${item.quantity}`);
  }
  
  return 'OK';
}

Go(struct)

// 自動生成ファイル: order_event.go
package models

type OrderItem struct {
    Sku      string  `json:"sku"`
    Quantity int     `json:"quantity"`
    Price    float64 `json:"price"`
}

type OrderCreated struct {
    OrderID   string      `json:"orderId"`
    CustomerID string     `json:"customerId"`
    Amount    float64     `json:"amount"`
    Currency  string      `json:"currency"`
    Items     []OrderItem `json:"items"`
    CreatedAt string      `json:"createdAt"`
    Source    string      `json:"source"`
}

// Lambda ハンドラー(型安全)
package main

import (
    "context"
    "encoding/json"
    "fmt"
    
    "github.com/aws/aws-lambda-go/lambda"
    "myapp/models"
)

func handleOrderCreated(ctx context.Context, event models.OrderCreated) (string, error) {
    // IDE の autocomplete が効く!
    fmt.Printf("Order ID: %s\n", event.OrderID)
    fmt.Printf("Customer: %s\n", event.CustomerID)
    fmt.Printf("Amount: %.2f %s\n", event.Amount, event.Currency)
    
    // 個別アイテムにアクセス(型チェック)
    for _, item := range event.Items {
        fmt.Printf("  SKU: %s, Qty: %d\n", item.Sku, item.Quantity)
    }
    
    return "OK", nil
}

func main() {
    lambda.Start(handleOrderCreated)
}

スキーマバージョン管理

バージョンアップのシナリオ

スキーマ v1:
{
  "orderId": "string",
  "customerId": "string",
  "amount": "number"
}

【後方互換な変更】

スキーマ v2(新フィールド追加):
{
  "orderId": "string",
  "customerId": "string",
  "amount": "number",
  "currency": "string"    ← 新規フィールド(オプション)
}

既存の v1 ハンドラーもそのまま動作
新しい v2 ハンドラーは currency を活用可能
✅ 後方互換性あり


【破壊的変更】

スキーマ v3(フィールド削除・型変更):
{
  "orderId": "string",
  "customerId": "string",
  "unitPrice": "number"   ← amount を削除して unitPrice に変更(破壊的)
}

既存の v1 ハンドラーが v3 イベントを処理しようとする
→ amount フィールドが見つからない
→ ランタイムエラー発生

❌ 破壊的変更・後方互換性なし
→ 全コンシューマーを更新する必要あり

API でバージョン管理

# 現在のスキーマ一覧(全バージョン含む)
aws events list-schema-versions \
  --registry-name company-registry \
  --schema-name 'com.company.orders@OrderCreated' \
  --query 'SchemaVersions[].{Version: Version, CreatedDate: CreatedDate}'

# 特定バージョンのスキーマを取得
aws events get-schema \
  --registry-name company-registry \
  --schema-name 'com.company.orders@OrderCreated' \
  --schema-version '2'

# スキーマを更新して新バージョンを作成
aws events update-schema \
  --registry-name company-registry \
  --schema-name 'com.company.orders@OrderCreated' \
  --type OPENAPI3 \
  --content file://updated-schema.json

# バージョン間の差分を確認(カスタム処理)
aws events describe-schema \
  --registry-name company-registry \
  --schema-name 'com.company.orders@OrderCreated' \
  | jq '.Content | fromjson | .components.schemas.OrderCreated.properties | keys'

イベントコントラクト管理

プロデューサー・コンシューマーの契約

【イベント駆動アーキテクチャの課題】

従来:
  プロデューサー(Orders サービス)
    ↓ PutEvents
  OrderCreated イベント: {orderId, customerId, amount}
    ↓
  コンシューマー(Inventory サービス)
    → orderId, customerId, amount を使用
  
  【問題】
  - Inventory が実は「currency」フィールドを期待していた
    (複数通貨対応が必要)
  - Orders がフィールド追加したら Inventory に通知忘れ
  - 本番環境で初めてエラー発見

✅ Schema Registry で解決:

  1. Orders チームが「OrderCreated スキーマ」をレジストリに登録
     {
       orderId: string,
       customerId: string,
       amount: number,
       currency: string,
       items: [{sku, quantity, price}]
     }
  
  2. Inventory チームが同じスキーマから Java コードを生成
     → OrderCreated クラスを利用
     → IDE 補完で all フィールドを確認
     → currency への依存が明示化
  
  3. Orders チームがスキーマ v2 にアップグレード
     → "unit_price" を追加、"amount" は廃止予定
     → Schema Registry が v2 変更を検知
     → Inventory チームに「破壊的変更」を通知
     → Inventory チームが v2 対応コードに更新
     → 一斉アップグレード実施
  
  ✅ イベントコントラクト違反を本番前に検知できた

チーム間の運用

Week 1:
  Orders チーム:「OrderCreated スキーマ v1」発表
  Inventory チーム:スキーマ v1 からコード生成、実装開始

Week 2:
  Orders チーム:「currency フィールド追加予定」を通知
  Schema Registry:新規 currency フィールドを追加(v2)
  Inventory チーム:currency への対応を実装

Week 3:
  全チーム:新スキーマ v2 で本番デプロイ
  → イベント・コンシューマー間の不一致ゼロ

主要ユースケース(10+)

1. マイクロサービス間のイベントコントラクト

課題: 注文・在庫・配送の 3 マイクロサービス間で「OrderCreated」イベントの定義が曖昧 解決:

  • Schema Registry に「OrderCreated」スキーマを登録
  • 3 チームが同じスキーマから型安全なコード生成
  • コンシューマー間のイベント型ミスマッチゼロ

2. マイクロサービス間の進化的スキーマ変更

課題: 新しい通貨対応が必要で「currency」フィールド追加 → 既存コンシューマーとの互換性が不透明 解決:

  • スキーマ v2 で「currency」(オプション)を追加
  • 後方互換性を保証
  • 旧コンシューマーもそのまま動作

3. AWS サービスイベントの型安全な処理

課題: S3 イベント・ECS イベント・Lambda イベント → Lambda で処理する時に「どのフィールドを使えるか」を毎回確認 解決:

  • AWS 提供スキーマから Java / Python / TypeScript コードを生成
  • IDE 補完で S3 イベント → bucket, key, object-size を即座に確認
  • 型チェック:存在しないフィールドアクセスは開発時に検出

4. イベント構造の自動ドキュメント化

課題: イベント駆動システムのドキュメントが古い・不正確 解決:

  • スキーマから OpenAPI 3.0 仕様自動生成
  • 各チームが同じドキュメントを参照
  • スキーマ更新 → ドキュメント自動更新

5. 運用環境でのスキーマ検出・ドキュメント化

課題: 本番環境で実際に流れるイベント構造が不透明 解決:

  • Event Bus でスキーマ検出を有効化
  • 実際のイベントから JSON Schema を自動推論
  • 本番の実データに基づいたスキーマが自動生成される

6. チーム別・環境別のスキーマガバナンス

課題: Dev・Staging・Prod で異なるスキーマ定義が混在 解決:

  • 環境別レジストリを作成
  • Dev → Staging → Prod へのスキーマプロモーション管理
  • 各環境で一貫したスキーマ定義

7. クロスチーム・クロスプロジェクトのスキーマ共有

課題: 複数プロジェクト間で「Order イベント」を共有したい 解決:

  • Shared Schema Registry を作成
  • 複数プロジェクト・複数チームが同じスキーマを参照
  • スキーマバージョン管理で互換性を保証

8. データレイク・分析パイプラインのスキーマ管理

課題: EventBridge イベント → S3 に保存 → Athena で分析する際、スキーマが不確定 解決:

  • EventBridge Schemas から Parquet / CSV スキーマを生成
  • Athena DDL を自動生成
  • イベント → S3 → Athena の全パイプラインで型安全性を確保

9. API Gateway + EventBridge の統合イベント管理

課題: API Gateway で受け取ったリクエスト → EventBridge に転送する際、スキーマ検証が必要 解決:

  • API Request Schema と EventBridge Event Schema を統一管理
  • API → Event 変換時に型チェック
  • API と Event のコントラクト一元化

10. SaaS・第三者システムとの統合イベント

課題: Salesforce / Shopify イベント → EventBridge で処理する際、スキーマが不明確 解決:

  • カスタムスキーマで Salesforce / Shopify イベント定義を登録
  • 内部システムとの型安全な変換
  • スキーマバージョン管理で API 変更に対応

設定・操作の具体例

AWS Console でのスキーマ登録

1. EventBridge コンソール → Schema Registries
2. 「Create registry」で新規レジストリ作成
   名前: company-events
   説明: Internal event schemas
3. 「Create schema」で新規スキーマ作成
   レジストリ: company-events
   スキーマ名: com.company.orders@OrderCreated
   タイプ: OpenAPI3 or JSONSchema Draft 4
4. スキーマ定義をペースト
5. 「Create」で登録完了

CLI でのスキーマ管理

# スキーマを JSON ファイルから登録
cat > order-schema.json << 'EOF'
{
  "openapi": "3.0.0",
  "info": {"title": "Order Events", "version": "1.0"},
  "components": {
    "schemas": {
      "OrderCreated": {
        "type": "object",
        "properties": {
          "orderId": {"type": "string"},
          "customerId": {"type": "string"},
          "amount": {"type": "number"},
          "currency": {"type": "string"},
          "items": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "sku": {"type": "string"},
                "quantity": {"type": "integer"},
                "price": {"type": "number"}
              }
            }
          }
        },
        "required": ["orderId", "customerId", "amount"]
      }
    }
  }
}
EOF

# スキーマ登録
aws events create-schema \
  --registry-name company-events \
  --schema-name 'com.company.orders@OrderCreated' \
  --type OPENAPI3 \
  --content file://order-schema.json

# スキーマ確認
aws events describe-schema \
  --registry-name company-events \
  --schema-name 'com.company.orders@OrderCreated'

# コードバインディング生成
aws events get-schema \
  --registry-name company-events \
  --schema-name 'com.company.orders@OrderCreated' \
  --schema-version 1

Terraform でスキーマを IaC 管理

provider "aws" {
  region = "ap-northeast-1"
}

# スキーマレジストリ作成
resource "aws_schemas_registry" "company" {
  name = "company-events"
  description = "Company-wide event schemas"
}

# スキーマ作成
resource "aws_schemas_schema" "order_created" {
  registry_name = aws_schemas_registry.company.name
  name = "com.company.orders@OrderCreated"
  type = "OpenAPI3"
  content = jsonencode({
    openapi = "3.0.0"
    info = {
      title = "Order Events"
      version = "1.0"
    }
    components = {
      schemas = {
        OrderCreated = {
          type = "object"
          properties = {
            orderId = {type = "string"}
            customerId = {type = "string"}
            amount = {type = "number"}
            currency = {type = "string", enum = ["USD", "JPY", "EUR"]}
            items = {
              type = "array"
              items = {
                type = "object"
                properties = {
                  sku = {type = "string"}
                  quantity = {type = "integer"}
                  price = {type = "number"}
                }
                required = ["sku", "quantity", "price"]
              }
            }
          }
          required = ["orderId", "customerId", "amount"]
        }
      }
    }
  })
}

# 出力:スキーマ ARN
output "schema_arn" {
  value = aws_schemas_schema.order_created.arn
}

Lambda での型安全なイベント処理

import json
from typing import Any, Dict
from dataclasses import dataclass, asdict
from datetime import datetime

# スキーマから自動生成されたクラス
@dataclass
class OrderItem:
    sku: str
    quantity: int
    price: float

@dataclass
class OrderCreated:
    orderId: str
    customerId: str
    amount: float
    currency: str
    items: list[OrderItem]
    timestamp: str

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """型安全なイベント処理"""
    
    try:
        # イベント detail を OrderCreated に変換
        detail = event.get('detail', {})
        
        order = OrderCreated(
            orderId=detail['orderId'],
            customerId=detail['customerId'],
            amount=detail['amount'],
            currency=detail['currency'],
            items=[
                OrderItem(
                    sku=item['sku'],
                    quantity=item['quantity'],
                    price=item['price']
                )
                for item in detail.get('items', [])
            ],
            timestamp=detail['timestamp']
        )
        
        # 型安全なアクセス
        print(f"Order {order.orderId} for customer {order.customerId}")
        print(f"Total: {order.amount} {order.currency}")
        
        # ビジネスロジック
        total_items = sum(item.quantity for item in order.items)
        print(f"Items: {total_items}")
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Order {order.orderId} processed',
                'items_count': total_items
            })
        }
        
    except KeyError as e:
        # スキーマ定義と異なるイベントが到着
        return {
            'statusCode': 400,
            'body': json.dumps({'error': f'Missing field: {str(e)}'})
        }

類似サービス比較表

特徴 EventBridge Schemas Apache Avro Protocol Buffers Confluent Schema Registry Apicurio Registry AsyncAPI
主な用途 AWS Event Registry Kafka / BigData gRPC / Services Kafka スキーマ管理 マルチクラウド イベント仕様
言語サポート Java / Python / TS / Go 多言語 Java / Go / Python / C++ / Ruby Kafka Client 言語 多言語 ドキュメント
AWS 統合 ✅ ネイティブ ⭕ 部分的
スキーマ自動検出 ✅ Event Bus から推論 ✅ Kafka から推論 ⭕ 手動
バージョン管理 ✅ 完全対応
IDE 統合 ✅ VS Code / JetBrains
セットアップ複雑性 ✅ 簡単(AWS Console) ⭕ 中程度 ⭕ 中程度 ⭕ 中程度 ⭕ 中程度 ⭕ 中程度
コスト ✅ 無料-安い ✅ 無料(OSS) ✅ 無料(OSS) $$(Enterprise) ✅ 無料(OSS) ✅ 無料

ベストプラクティス

✅ 推奨される設定・運用

  1. スキーマレジストリを組織の中央リポジトリに

    • 全マイクロサービスが同じスキーマレジストリを参照
    • スキーマ変更時に全チームに通知・同期
  2. スキーマ自動検出を本番環境で常時有効化

    • Event Bus からイベント自動サンプリング
    • 実運用データに基づいたスキーマを自動保守
    • コスト:最初 500 万イベント/月は無料
  3. スキーマバージョン管理で後方互換性を強制

    • 新フィールド追加は OK(オプション化)
    • フィールド削除・型変更は新メジャーバージョン
    • コンシューマー一斉アップグレードが必要なときは通知
  4. IDE プラグインで開発効率向上

    • VS Code / JetBrains から直接スキーマ参照・コード生成
    • autocomplete で IDE 補完を活用
    • 開発時間・テスト工数削減
  5. コードバインディングで型安全性を確保

    • 言語ごとに自動生成クラス・インターフェースを使用
    • IDE 補完・型チェックで ランタイムエラー削減
    • テスト工数削減

❌ よくある失敗パターン

  1. スキーマを登録したまま管理・更新しない

    • 結果:スキーマが実際のイベント構造と乖離
    • 対策:月 1 回スキーマ検出を確認、変更があれば反映
  2. スキーマバージョン管理を無視して破壊的変更

    • 結果:既存コンシューマーが突然エラー
    • 対策:新フィールド追加は v_minor、フィールド削除は v_major で通知
  3. AWS サービスイベントのスキーマを確認せず実装

    • 結果:存在しないフィールドへのアクセス → ランタイムエラー
    • 対策:S3 / ECS イベントは事前にスキーマを確認
  4. コードバインディングを一度生成した後、更新しない

    • 結果:新フィールドが追加されても古いコード使用
    • 対策:スキーマバージョン更新時に再生成

トラブルシューティング表

症状 原因 解決策
スキーマが自動検出されない Event Bus のスキーマ検出が有効化されていない EventBridge コンソール → Event Bus → 「Enable schema discovery」をオン
コードバインディング生成時にエラー スキーマの JSON 形式が不正 スキーマを JSON Schema / OpenAPI 3.0 バリデーター で確認
生成されたコードが IDE で補完されない ダウンロードしたバインディングが正しくプロジェクトに統合されていない ZIP 解凍後、正しいパッケージパスに配置(Python: src/, Java: src/main/java/)
スキーマバージョン 1 のイベント処理が失敗 バージョン 2 でフィールド削除(破壊的変更)後、v1 イベント形式が送信されている v1 互換性用のコンシューマーを保守、または両バージョンに対応
AWS サービスイベント(S3)のスキーマが表示されない デフォルトレジストリ「aws.events」にアクセスしていない EventBridge コンソール → Schema Registries → 「aws.events」を選択

2025-2026 最新動向

AsyncAPI 仕様との統合

  • EventBridge Schemas が AsyncAPI 3.0 形式に対応予定(2025 年下半期)
  • より詳細なイベント仕様(request-reply パターン)の定義が可能に

AI による スキーマ提案機能

  • 2026 年:生成 AI が「イベント構造から最適なスキーマ」を提案
  • サンプルイベント入力 → スキーマ自動最適化

マルチリージョン・マルチクラウドレジストリ

  • 現在:AWS リージョン内のみ
  • 将来:複数 AWS リージョン間・GCP / Azure への同期検討中

Confluent Schema Registry との統合

  • 現在:EventBridge ← → Confluent 間でスキーマを手動同期
  • 将来:ネイティブ連携検討中

学習リソース・参考文献

公式ドキュメント(8+)

  1. EventBridge Schema Registry Guide
  2. EventBridge Schemas API Reference
  3. Schema Discovery
  4. Code Bindings
  5. AWS Service Event Schemas
  6. EventBridge Best Practices
  7. CloudFormation Events

OSS / 関連規格(5+)

  1. Apache Avro - Kafka のデフォルトスキーマ形式
  2. Protocol Buffers - gRPC・Google Cloud Pub/Sub のスキーマ
  3. Confluent Schema Registry - Kafka スキーマ管理
  4. Apicurio Registry - マルチクラウドスキーマレジストリ
  5. AsyncAPI - イベント駆動 API 仕様

実装例・チェックリスト

実装の段階(フェーズごと)

Phase 1: 基礎(週 1)

  • [ ] EventBridge で最初のカスタムイベントバスを作成
  • [ ] 簡単なカスタムスキーマを手動作成・登録
  • [ ] Python / Java でコードバインディング生成・テスト

Phase 2: 自動検出(週 2-3)

  • [ ] Event Bus でスキーマ自動検出を有効化
  • [ ] 実イベント → 自動スキーマ推論を確認
  • [ ] スキーマバージョン管理の基本操作

Phase 3: 統合・マルチサービス(月 1-2)

  • [ ] チーム間で共有スキーマレジストリ構築
  • [ ] マイクロサービス 3+ が同じスキーマで開発
  • [ ] スキーマ変更通知・バージョン管理プロセス構築

採用判断チェックリスト

  • [ ] イベント駆動アーキテクチャ(3+ マイクロサービス)を構築している
  • [ ] マイクロサービス間でイベント構造の不一致エラーが発生している
  • [ ] 開発効率の向上(型安全・IDE 補完)を望んでいる
  • [ ] API / スキーマのバージョン管理を一元化したい

まとめ

Amazon EventBridge Schema Registry は 「イベント駆動アーキテクチャのスキーマ管理・型安全開発を実現する統合プラットフォーム」。AWS サービスイベント・カスタムイベント・自動検出スキーマを一元管理し、Java / Python / TypeScript / Go の型安全なコードを自動生成。IDE 補完で開発効率を向上させ、スキーマバージョン管理で プロデューサー・コンシューマー間の契約違反を本番前に検知。

最小限の導入: 1 個のカスタムスキーマ + Python コード生成 = マイクロサービス型不一致エラー 80% 削減 フル活用: 複数スキーマ + 自動検出 + チーム間共有 = イベント駆動システムの信頼性・保守性 大幅向上

マイクロサービスが 3 個以上の組織では ほぼ必須。スキーマレジストリがイベントシステムの「Single Source of Truth」となり、組織全体の開発速度・品質向上に大きく貢献。

最終更新:2026-04-26 バージョン:v2.0