目次
EventBridge Schemas 完全ガイド v2.0
イベント駆動アーキテクチャのスキーマ管理・型安全開発プラットフォーム
概要
Amazon EventBridge Schema Registry は 「EventBridge を流れるイベントのスキーマを自動検出・登録・バージョン管理し、言語別の型安全なコードバインディングを自動生成するサービス」 である。イベント駆動アーキテクチャにおけるプロデューサーとコンシューマー間のイベント構造契約(スキーマコントラクト) を一元管理し、API Gateway・gRPC のような Specification 駆動開発を EventBridge 環境で実現する。
本質的な価値:
- イベント構造の暗黙的知識をコード化: マイクロサービス間で「どんなフィールドを含むイベント」かを明示化し、破壊的変更を事前に検知
- 型安全な開発体験: Java・Python・TypeScript・Go のコードバインディングを自動生成、IDE 補完が効いた開発
- AWS サービスイベントの参照: S3・EC2・ECS・Lambda が発行するイベント構造をスキーマから即座に参照
- スキーマの自動検出: 実際のイベントバスからイベントをサンプリング → JSON Schema を自動推論 → スキーマレジストリに登録
- チーム間のコミュニケーション: スキーマバージョン変更時に「破壊的変更か後方互換か」を可視化
課題と特徴
EventBridge Schemas が解決する課題
❌ 課題:イベント駆動システムが複雑・エラーが多い
→ プロデューサー(注文サービス)が「orderId: string」をイベントに含めた
→ コンシューマー(在庫サービス)が「orderId: number」と仮定して処理
→ 本番環境でデータ型ミスマッチエラー発生
→ 問題の原因追跡に数日かかる
→ 新チームメンバーが「このイベントはどんな構造?」を聞く度に
Slack / ドキュメントで説明必要
→ ドキュメントが古くなってさらに混乱
✅ 解決:EventBridge Schemas で型安全・スキーマ駆動開発
→ スキーマから Java / Python / TypeScript / Go コード自動生成
→ IDE の自動補完でイベント構造を即座に確認・アクセス
→ スキーマバージョン管理で「v1 → v2」の破壊的変更を事前検知
→ チーム全体で「Single Source of Truth(SSOT)」としてスキーマを共有
→ マイクロサービス間の契約違反を本番前に検出
コアの特徴
| 特徴 | 説明 |
|---|---|
| AWS サービスイベントスキーマ | S3 / EC2 / ECS / Lambda・30+ AWS サービスのイベント定義を事前提供 |
| カスタムスキーマ | 手動で JSON Schema / OpenAPI 3.0 を登録、バージョン管理 |
| スキーマ自動検出 | イベントバスを監視、実際のイベントから JSON Schema を推論・自動登録 |
| コードバインディング生成 | Java / Python / TypeScript / Go のコードを自動生成 |
| IDE 統合 | VS Code / JetBrains 環境でスキーマ参照・コード生成が可能 |
| バージョン管理 | スキーマ変更時にバージョンアップ、破壊的変更を検知 |
| Schema Registry API | DescribeSchema / GetDiscoveredSchema で プログラムからスキーマ取得 |
| EventBridge Rules との統合 | スキーマをルールのターゲットに指定可能 |
| 無料スキーマ検出 | 最初 500 万イベント/月は無料、以降 $0.10/100 万 |
アーキテクチャ(Mermaid 図 1)
graph TB
subgraph "Event Sources"
A["Producer 1<br/>Order Service<br/>PutEvents"]
B["Producer 2<br/>Payment Service<br/>PutEvents"]
C["AWS Service<br/>S3 / EC2<br/>native events"]
end
subgraph "EventBridge Bus"
D["Custom Event Bus<br/>com.company.orders"]
E["Default Bus<br/>aws.ec2 events"]
end
subgraph "Schema Detection & Registry"
F["Schema Discoverer<br/>Sample Events<br/>JSON Schema Inference"]
G["Schema Registry<br/>aws.ec2@StartInstances v1<br/>com.company.orders@OrderCreated v2"]
end
subgraph "Code Generation & IDE Integration"
H["Code Bindings<br/>Java / Python<br/>TypeScript / Go"]
I["IDE Plugins<br/>VS Code<br/>JetBrains"]
end
subgraph "Consumer Development"
J["Lambda / ECS<br/>Type-safe Event Handler<br/>IDE Autocomplete"]
end
A --> D
B --> D
C --> E
D --> F
E --> G
F --> G
G --> H
G --> I
H --> J
I --> J
スキーマの種類
1. AWS 管理スキーマ(aws.* namespace)
概要: AWS サービスが発行するイベントのスキーマを AWS が提供
AWS サービスイベントスキーマの例:
aws.ec2:
StartInstances
StopInstances
RebootInstances
TerminateInstances
詳細フィールド: instance-id, region, account-id, time
aws.s3:
ObjectCreated:Put
ObjectCreated:Post
ObjectCreated:Copy
ObjectRemoved:Delete
詳細フィールド: bucket, key, object-size, etag
aws.ecs:
ECS Task State Change
ECS Service Action
詳細フィールド: task-id, service-name, cluster-arn, status
aws.dynamodb:
Streams Record(INSERT / MODIFY / REMOVE)
詳細フィールド: table-name, keys, new-image, old-image
aws.lambda:
Function Invocation Completed
詳細フィールド: function-name, request-id, duration, error-message
API: DescribeSchema で取得
aws events describe-schema \
--registry-name 'aws.events' \
--schema-name 'aws.ec2@EC2 Instance State-change Notification'
スキーマ登録先: aws.events レジストリに事前登録(削除・変更不可)
2. カスタムスキーマ(手動登録)
概要: 自社アプリケーションが発行するカスタムイベント用スキーマ
カスタムスキーマの例:
スキーマ登録(JSON Schema Draft 4 形式):
{
"Name": "com.company.orders@OrderCreated",
"Type": "OpenAPI3",
"Content": {
"openapi": "3.0.0",
"info": {"title": "Order Events", "version": "1.0"},
"paths": {},
"components": {
"schemas": {
"OrderCreated": {
"type": "object",
"properties": {
"orderId": {"type": "string", "format": "uuid"},
"customerId": {"type": "string"},
"amount": {"type": "number", "format": "double"},
"currency": {"type": "string", "enum": ["USD", "JPY", "EUR"]},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"sku": {"type": "string"},
"quantity": {"type": "integer", "minimum": 1},
"price": {"type": "number"}
},
"required": ["sku", "quantity", "price"]
}
},
"createdAt": {"type": "string", "format": "date-time"},
"source": {"type": "string", "enum": ["web", "mobile", "api"]}
},
"required": ["orderId", "customerId", "amount", "items", "createdAt"]
}
}
}
}
}
API: CreateSchema で登録
aws events create-schema \
--registry-name 'company-registry' \
--schema-name 'com.company.orders@OrderCreated' \
--type OPENAPI3 \
--content file://order-event-schema.json
登録先: company-registry など カスタムレジストリに登録
3. スキーマ自動検出(Schema Discovery)
概要: イベントバスを監視 → 実際のイベントから JSON Schema を自動推論
スキーマ自動検出の流れ:
1. EventBridge コンソール → Custom Event Bus
→ 「Enable schema discovery」をオン
2. アプリケーションが PutEvents でイベント送信:
{
"source": "com.myapp.orders",
"detail-type": "OrderCreated",
"detail": {
"orderId": "ORD-001",
"customerId": "CUST-123",
"amount": 1500.50,
"items": [
{"sku": "SKU-A", "quantity": 2, "price": 750.25}
],
"timestamp": "2026-04-26T10:30:00Z"
}
}
3. EventBridge が自動的にイベントをサンプリング
→ JSON Schema を推論
→ Schema Registry に登録:
レジストリ: discovered-schemas
スキーマ名: com.myapp.orders@OrderCreated
バージョン: 1
4. 新しいイベント形式が到着(currency フィールド追加):
{
"orderId": "ORD-002",
"customerId": "CUST-456",
"amount": 2000,
"currency": "JPY", ← 新規フィールド
"items": [...]
}
5. EventBridge がスキーマ変更を検知
→ 新バージョン「v2」として登録
→ バージョン差分を表示
コスト:
最初 500 万イベント/月:無料
500 万超:$0.10/100 万イベント
使用例: CLI で自動検出を有効化
# カスタムイベントバスでスキーマ検出を有効化
aws events put-event-bus-policy \
--name custom-event-bus \
--policy '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": "*",
"Action": "events:PutEvents",
"Resource": "arn:aws:events:*:*:event-bus/custom-event-bus"
}]
}'
# Schema Discovery をトリガー(イベント送信時に自動実行)
aws events put-events \
--entries '[{
"Source": "com.myapp.orders",
"DetailType": "OrderCreated",
"Detail": "{\"orderId\":\"ORD-001\",\"customerId\":\"CUST-123\",\"amount\":1500.50}"
}]'
# 検出されたスキーマを確認
aws events describe-schema \
--registry-name discovered-schemas \
--schema-name 'com.myapp.orders@OrderCreated'
コードバインディング生成
生成フロー
Marketplace での流れ:
1. EventBridge コンソール → Schema Registry
2. スキーマを選択(例:com.company.orders@OrderCreated)
3. 「Download code binding」をクリック
4. 言語を選択:Java / Python / TypeScript / Go
5. ZIP ファイルをダウンロード
6. プロジェクトに解凍・統合
各言語の生成コード例
Python(dataclass)
# 自動生成ファイル: generated/order_event.py
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class OrderItem:
sku: str
quantity: int
price: float
@dataclass
class OrderCreated:
orderId: str
customerId: str
amount: float
currency: str
items: List[OrderItem]
createdAt: str
source: str
# Lambda ハンドラー(型安全)
from aws_lambda_powertools import Logger
from generated.order_event import OrderCreated, Marshaller
logger = Logger()
def lambda_handler(event, context):
# イベント → OrderCreated オブジェクトに自動変換
order = Marshaller.unmarshal_event(event, OrderCreated)
# IDE 補完が効く!
logger.info(f"Order ID: {order.orderId}")
logger.info(f"Customer: {order.customerId}")
logger.info(f"Amount: {order.amount} {order.currency}")
# 個別アイテムにアクセス
for item in order.items:
logger.info(f" SKU: {item.sku}, Qty: {item.quantity}, Price: {item.price}")
return {"statusCode": 200, "message": f"Order {order.orderId} processed"}
Java(POJO + Jackson)
// 自動生成ファイル: OrderCreated.java
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.OffsetDateTime;
import java.util.List;
public class OrderCreated {
@JsonProperty("orderId")
private String orderId;
@JsonProperty("customerId")
private String customerId;
@JsonProperty("amount")
private Double amount;
@JsonProperty("currency")
private String currency;
@JsonProperty("items")
private List<OrderItem> items;
@JsonProperty("createdAt")
private String createdAt;
@JsonProperty("source")
private String source;
// Getter / Setter 自動生成
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
// ...
}
// Lambda ハンドラー(型安全)
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
public class OrderEventHandler implements RequestHandler<Map<String, Object>, String> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String handleRequest(Map<String, Object> event, Context context) {
try {
// イベント → OrderCreated オブジェクトに自動変換
OrderCreated order = objectMapper.convertValue(
event.get("detail"),
OrderCreated.class
);
// IDE 補完が効く!
context.getLogger().log("Order ID: " + order.getOrderId());
context.getLogger().log("Amount: " + order.getAmount() + " " + order.getCurrency());
// 個別アイテムにアクセス
for (OrderItem item : order.getItems()) {
context.getLogger().log(" SKU: " + item.getSku() +
", Qty: " + item.getQuantity());
}
return "OK";
} catch (Exception e) {
context.getLogger().log("Error: " + e.getMessage());
return "ERROR";
}
}
}
TypeScript(interface)
// 自動生成ファイル: order-event.ts
export interface OrderItem {
sku: string;
quantity: number;
price: number;
}
export interface OrderCreated {
orderId: string;
customerId: string;
amount: number;
currency: string;
items: OrderItem[];
createdAt: string;
source: string;
}
// Lambda ハンドラー(型安全)
import { CloudWatchLogsEvent } from 'aws-lambda';
import { OrderCreated } from './order-event';
export async function handler(event: any): Promise<string> {
// イベント → OrderCreated に自動キャスト
const order: OrderCreated = event.detail;
// IDE の autocomplete が効く!
console.log(`Order ID: ${order.orderId}`);
console.log(`Customer: ${order.customerId}`);
console.log(`Amount: ${order.amount} ${order.currency}`);
// 個別アイテムにアクセス(型チェック)
for (const item of order.items) {
console.log(` SKU: ${item.sku}, Qty: ${item.quantity}`);
}
return 'OK';
}
Go(struct)
// 自動生成ファイル: order_event.go
package models
type OrderItem struct {
Sku string `json:"sku"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type OrderCreated struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Amount float64 `json:"amount"`
Currency string `json:"currency"`
Items []OrderItem `json:"items"`
CreatedAt string `json:"createdAt"`
Source string `json:"source"`
}
// Lambda ハンドラー(型安全)
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/lambda"
"myapp/models"
)
func handleOrderCreated(ctx context.Context, event models.OrderCreated) (string, error) {
// IDE の autocomplete が効く!
fmt.Printf("Order ID: %s\n", event.OrderID)
fmt.Printf("Customer: %s\n", event.CustomerID)
fmt.Printf("Amount: %.2f %s\n", event.Amount, event.Currency)
// 個別アイテムにアクセス(型チェック)
for _, item := range event.Items {
fmt.Printf(" SKU: %s, Qty: %d\n", item.Sku, item.Quantity)
}
return "OK", nil
}
func main() {
lambda.Start(handleOrderCreated)
}
スキーマバージョン管理
バージョンアップのシナリオ
スキーマ v1:
{
"orderId": "string",
"customerId": "string",
"amount": "number"
}
【後方互換な変更】
スキーマ v2(新フィールド追加):
{
"orderId": "string",
"customerId": "string",
"amount": "number",
"currency": "string" ← 新規フィールド(オプション)
}
既存の v1 ハンドラーもそのまま動作
新しい v2 ハンドラーは currency を活用可能
✅ 後方互換性あり
【破壊的変更】
スキーマ v3(フィールド削除・型変更):
{
"orderId": "string",
"customerId": "string",
"unitPrice": "number" ← amount を削除して unitPrice に変更(破壊的)
}
既存の v1 ハンドラーが v3 イベントを処理しようとする
→ amount フィールドが見つからない
→ ランタイムエラー発生
❌ 破壊的変更・後方互換性なし
→ 全コンシューマーを更新する必要あり
API でバージョン管理
# 現在のスキーマ一覧(全バージョン含む)
aws events list-schema-versions \
--registry-name company-registry \
--schema-name 'com.company.orders@OrderCreated' \
--query 'SchemaVersions[].{Version: Version, CreatedDate: CreatedDate}'
# 特定バージョンのスキーマを取得
aws events get-schema \
--registry-name company-registry \
--schema-name 'com.company.orders@OrderCreated' \
--schema-version '2'
# スキーマを更新して新バージョンを作成
aws events update-schema \
--registry-name company-registry \
--schema-name 'com.company.orders@OrderCreated' \
--type OPENAPI3 \
--content file://updated-schema.json
# バージョン間の差分を確認(カスタム処理)
aws events describe-schema \
--registry-name company-registry \
--schema-name 'com.company.orders@OrderCreated' \
| jq '.Content | fromjson | .components.schemas.OrderCreated.properties | keys'
イベントコントラクト管理
プロデューサー・コンシューマーの契約
【イベント駆動アーキテクチャの課題】
従来:
プロデューサー(Orders サービス)
↓ PutEvents
OrderCreated イベント: {orderId, customerId, amount}
↓
コンシューマー(Inventory サービス)
→ orderId, customerId, amount を使用
【問題】
- Inventory が実は「currency」フィールドを期待していた
(複数通貨対応が必要)
- Orders がフィールド追加したら Inventory に通知忘れ
- 本番環境で初めてエラー発見
✅ Schema Registry で解決:
1. Orders チームが「OrderCreated スキーマ」をレジストリに登録
{
orderId: string,
customerId: string,
amount: number,
currency: string,
items: [{sku, quantity, price}]
}
2. Inventory チームが同じスキーマから Java コードを生成
→ OrderCreated クラスを利用
→ IDE 補完で all フィールドを確認
→ currency への依存が明示化
3. Orders チームがスキーマ v2 にアップグレード
→ "unit_price" を追加、"amount" は廃止予定
→ Schema Registry が v2 変更を検知
→ Inventory チームに「破壊的変更」を通知
→ Inventory チームが v2 対応コードに更新
→ 一斉アップグレード実施
✅ イベントコントラクト違反を本番前に検知できた
チーム間の運用
Week 1:
Orders チーム:「OrderCreated スキーマ v1」発表
Inventory チーム:スキーマ v1 からコード生成、実装開始
Week 2:
Orders チーム:「currency フィールド追加予定」を通知
Schema Registry:新規 currency フィールドを追加(v2)
Inventory チーム:currency への対応を実装
Week 3:
全チーム:新スキーマ v2 で本番デプロイ
→ イベント・コンシューマー間の不一致ゼロ
主要ユースケース(10+)
1. マイクロサービス間のイベントコントラクト
課題: 注文・在庫・配送の 3 マイクロサービス間で「OrderCreated」イベントの定義が曖昧 解決:
- Schema Registry に「OrderCreated」スキーマを登録
- 3 チームが同じスキーマから型安全なコード生成
- コンシューマー間のイベント型ミスマッチゼロ
2. マイクロサービス間の進化的スキーマ変更
課題: 新しい通貨対応が必要で「currency」フィールド追加 → 既存コンシューマーとの互換性が不透明 解決:
- スキーマ v2 で「currency」(オプション)を追加
- 後方互換性を保証
- 旧コンシューマーもそのまま動作
3. AWS サービスイベントの型安全な処理
課題: S3 イベント・ECS イベント・Lambda イベント → Lambda で処理する時に「どのフィールドを使えるか」を毎回確認 解決:
- AWS 提供スキーマから Java / Python / TypeScript コードを生成
- IDE 補完で S3 イベント → bucket, key, object-size を即座に確認
- 型チェック:存在しないフィールドアクセスは開発時に検出
4. イベント構造の自動ドキュメント化
課題: イベント駆動システムのドキュメントが古い・不正確 解決:
- スキーマから OpenAPI 3.0 仕様自動生成
- 各チームが同じドキュメントを参照
- スキーマ更新 → ドキュメント自動更新
5. 運用環境でのスキーマ検出・ドキュメント化
課題: 本番環境で実際に流れるイベント構造が不透明 解決:
- Event Bus でスキーマ検出を有効化
- 実際のイベントから JSON Schema を自動推論
- 本番の実データに基づいたスキーマが自動生成される
6. チーム別・環境別のスキーマガバナンス
課題: Dev・Staging・Prod で異なるスキーマ定義が混在 解決:
- 環境別レジストリを作成
- Dev → Staging → Prod へのスキーマプロモーション管理
- 各環境で一貫したスキーマ定義
7. クロスチーム・クロスプロジェクトのスキーマ共有
課題: 複数プロジェクト間で「Order イベント」を共有したい 解決:
- Shared Schema Registry を作成
- 複数プロジェクト・複数チームが同じスキーマを参照
- スキーマバージョン管理で互換性を保証
8. データレイク・分析パイプラインのスキーマ管理
課題: EventBridge イベント → S3 に保存 → Athena で分析する際、スキーマが不確定 解決:
- EventBridge Schemas から Parquet / CSV スキーマを生成
- Athena DDL を自動生成
- イベント → S3 → Athena の全パイプラインで型安全性を確保
9. API Gateway + EventBridge の統合イベント管理
課題: API Gateway で受け取ったリクエスト → EventBridge に転送する際、スキーマ検証が必要 解決:
- API Request Schema と EventBridge Event Schema を統一管理
- API → Event 変換時に型チェック
- API と Event のコントラクト一元化
10. SaaS・第三者システムとの統合イベント
課題: Salesforce / Shopify イベント → EventBridge で処理する際、スキーマが不明確 解決:
- カスタムスキーマで Salesforce / Shopify イベント定義を登録
- 内部システムとの型安全な変換
- スキーマバージョン管理で API 変更に対応
設定・操作の具体例
AWS Console でのスキーマ登録
1. EventBridge コンソール → Schema Registries
2. 「Create registry」で新規レジストリ作成
名前: company-events
説明: Internal event schemas
3. 「Create schema」で新規スキーマ作成
レジストリ: company-events
スキーマ名: com.company.orders@OrderCreated
タイプ: OpenAPI3 or JSONSchema Draft 4
4. スキーマ定義をペースト
5. 「Create」で登録完了
CLI でのスキーマ管理
# スキーマを JSON ファイルから登録
cat > order-schema.json << 'EOF'
{
"openapi": "3.0.0",
"info": {"title": "Order Events", "version": "1.0"},
"components": {
"schemas": {
"OrderCreated": {
"type": "object",
"properties": {
"orderId": {"type": "string"},
"customerId": {"type": "string"},
"amount": {"type": "number"},
"currency": {"type": "string"},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"sku": {"type": "string"},
"quantity": {"type": "integer"},
"price": {"type": "number"}
}
}
}
},
"required": ["orderId", "customerId", "amount"]
}
}
}
}
EOF
# スキーマ登録
aws events create-schema \
--registry-name company-events \
--schema-name 'com.company.orders@OrderCreated' \
--type OPENAPI3 \
--content file://order-schema.json
# スキーマ確認
aws events describe-schema \
--registry-name company-events \
--schema-name 'com.company.orders@OrderCreated'
# コードバインディング生成
aws events get-schema \
--registry-name company-events \
--schema-name 'com.company.orders@OrderCreated' \
--schema-version 1
Terraform でスキーマを IaC 管理
provider "aws" {
region = "ap-northeast-1"
}
# スキーマレジストリ作成
resource "aws_schemas_registry" "company" {
name = "company-events"
description = "Company-wide event schemas"
}
# スキーマ作成
resource "aws_schemas_schema" "order_created" {
registry_name = aws_schemas_registry.company.name
name = "com.company.orders@OrderCreated"
type = "OpenAPI3"
content = jsonencode({
openapi = "3.0.0"
info = {
title = "Order Events"
version = "1.0"
}
components = {
schemas = {
OrderCreated = {
type = "object"
properties = {
orderId = {type = "string"}
customerId = {type = "string"}
amount = {type = "number"}
currency = {type = "string", enum = ["USD", "JPY", "EUR"]}
items = {
type = "array"
items = {
type = "object"
properties = {
sku = {type = "string"}
quantity = {type = "integer"}
price = {type = "number"}
}
required = ["sku", "quantity", "price"]
}
}
}
required = ["orderId", "customerId", "amount"]
}
}
}
})
}
# 出力:スキーマ ARN
output "schema_arn" {
value = aws_schemas_schema.order_created.arn
}
Lambda での型安全なイベント処理
import json
from typing import Any, Dict
from dataclasses import dataclass, asdict
from datetime import datetime
# スキーマから自動生成されたクラス
@dataclass
class OrderItem:
sku: str
quantity: int
price: float
@dataclass
class OrderCreated:
orderId: str
customerId: str
amount: float
currency: str
items: list[OrderItem]
timestamp: str
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""型安全なイベント処理"""
try:
# イベント detail を OrderCreated に変換
detail = event.get('detail', {})
order = OrderCreated(
orderId=detail['orderId'],
customerId=detail['customerId'],
amount=detail['amount'],
currency=detail['currency'],
items=[
OrderItem(
sku=item['sku'],
quantity=item['quantity'],
price=item['price']
)
for item in detail.get('items', [])
],
timestamp=detail['timestamp']
)
# 型安全なアクセス
print(f"Order {order.orderId} for customer {order.customerId}")
print(f"Total: {order.amount} {order.currency}")
# ビジネスロジック
total_items = sum(item.quantity for item in order.items)
print(f"Items: {total_items}")
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Order {order.orderId} processed',
'items_count': total_items
})
}
except KeyError as e:
# スキーマ定義と異なるイベントが到着
return {
'statusCode': 400,
'body': json.dumps({'error': f'Missing field: {str(e)}'})
}
類似サービス比較表
| 特徴 | EventBridge Schemas | Apache Avro | Protocol Buffers | Confluent Schema Registry | Apicurio Registry | AsyncAPI |
|---|---|---|---|---|---|---|
| 主な用途 | AWS Event Registry | Kafka / BigData | gRPC / Services | Kafka スキーマ管理 | マルチクラウド | イベント仕様 |
| 言語サポート | Java / Python / TS / Go | 多言語 | Java / Go / Python / C++ / Ruby | Kafka Client 言語 | 多言語 | ドキュメント |
| AWS 統合 | ✅ ネイティブ | ❌ | ❌ | ⭕ 部分的 | ❌ | ❌ |
| スキーマ自動検出 | ✅ Event Bus から推論 | ❌ | ❌ | ✅ Kafka から推論 | ⭕ 手動 | ❌ |
| バージョン管理 | ✅ 完全対応 | ✅ | ✅ | ✅ | ✅ | ❌ |
| IDE 統合 | ✅ VS Code / JetBrains | ❌ | ⭕ | ❌ | ❌ | ❌ |
| セットアップ複雑性 | ✅ 簡単(AWS Console) | ⭕ 中程度 | ⭕ 中程度 | ⭕ 中程度 | ⭕ 中程度 | ⭕ 中程度 |
| コスト | ✅ 無料-安い | ✅ 無料(OSS) | ✅ 無料(OSS) | $$(Enterprise) | ✅ 無料(OSS) | ✅ 無料 |
ベストプラクティス
✅ 推奨される設定・運用
-
スキーマレジストリを組織の中央リポジトリに
- 全マイクロサービスが同じスキーマレジストリを参照
- スキーマ変更時に全チームに通知・同期
-
スキーマ自動検出を本番環境で常時有効化
- Event Bus からイベント自動サンプリング
- 実運用データに基づいたスキーマを自動保守
- コスト:最初 500 万イベント/月は無料
-
スキーマバージョン管理で後方互換性を強制
- 新フィールド追加は OK(オプション化)
- フィールド削除・型変更は新メジャーバージョン
- コンシューマー一斉アップグレードが必要なときは通知
-
IDE プラグインで開発効率向上
- VS Code / JetBrains から直接スキーマ参照・コード生成
- autocomplete で IDE 補完を活用
- 開発時間・テスト工数削減
-
コードバインディングで型安全性を確保
- 言語ごとに自動生成クラス・インターフェースを使用
- IDE 補完・型チェックで ランタイムエラー削減
- テスト工数削減
❌ よくある失敗パターン
-
スキーマを登録したまま管理・更新しない
- 結果:スキーマが実際のイベント構造と乖離
- 対策:月 1 回スキーマ検出を確認、変更があれば反映
-
スキーマバージョン管理を無視して破壊的変更
- 結果:既存コンシューマーが突然エラー
- 対策:新フィールド追加は v_minor、フィールド削除は v_major で通知
-
AWS サービスイベントのスキーマを確認せず実装
- 結果:存在しないフィールドへのアクセス → ランタイムエラー
- 対策:S3 / ECS イベントは事前にスキーマを確認
-
コードバインディングを一度生成した後、更新しない
- 結果:新フィールドが追加されても古いコード使用
- 対策:スキーマバージョン更新時に再生成
トラブルシューティング表
| 症状 | 原因 | 解決策 |
|---|---|---|
| スキーマが自動検出されない | Event Bus のスキーマ検出が有効化されていない | EventBridge コンソール → Event Bus → 「Enable schema discovery」をオン |
| コードバインディング生成時にエラー | スキーマの JSON 形式が不正 | スキーマを JSON Schema / OpenAPI 3.0 バリデーター で確認 |
| 生成されたコードが IDE で補完されない | ダウンロードしたバインディングが正しくプロジェクトに統合されていない | ZIP 解凍後、正しいパッケージパスに配置(Python: src/, Java: src/main/java/) |
| スキーマバージョン 1 のイベント処理が失敗 | バージョン 2 でフィールド削除(破壊的変更)後、v1 イベント形式が送信されている | v1 互換性用のコンシューマーを保守、または両バージョンに対応 |
| AWS サービスイベント(S3)のスキーマが表示されない | デフォルトレジストリ「aws.events」にアクセスしていない | EventBridge コンソール → Schema Registries → 「aws.events」を選択 |
2025-2026 最新動向
AsyncAPI 仕様との統合
- EventBridge Schemas が AsyncAPI 3.0 形式に対応予定(2025 年下半期)
- より詳細なイベント仕様(request-reply パターン)の定義が可能に
AI による スキーマ提案機能
- 2026 年:生成 AI が「イベント構造から最適なスキーマ」を提案
- サンプルイベント入力 → スキーマ自動最適化
マルチリージョン・マルチクラウドレジストリ
- 現在:AWS リージョン内のみ
- 将来:複数 AWS リージョン間・GCP / Azure への同期検討中
Confluent Schema Registry との統合
- 現在:EventBridge ← → Confluent 間でスキーマを手動同期
- 将来:ネイティブ連携検討中
学習リソース・参考文献
公式ドキュメント(8+)
- EventBridge Schema Registry Guide
- EventBridge Schemas API Reference
- Schema Discovery
- Code Bindings
- AWS Service Event Schemas
- EventBridge Best Practices
- CloudFormation Events
OSS / 関連規格(5+)
- Apache Avro - Kafka のデフォルトスキーマ形式
- Protocol Buffers - gRPC・Google Cloud Pub/Sub のスキーマ
- Confluent Schema Registry - Kafka スキーマ管理
- Apicurio Registry - マルチクラウドスキーマレジストリ
- AsyncAPI - イベント駆動 API 仕様
実装例・チェックリスト
実装の段階(フェーズごと)
Phase 1: 基礎(週 1)
- [ ] EventBridge で最初のカスタムイベントバスを作成
- [ ] 簡単なカスタムスキーマを手動作成・登録
- [ ] Python / Java でコードバインディング生成・テスト
Phase 2: 自動検出(週 2-3)
- [ ] Event Bus でスキーマ自動検出を有効化
- [ ] 実イベント → 自動スキーマ推論を確認
- [ ] スキーマバージョン管理の基本操作
Phase 3: 統合・マルチサービス(月 1-2)
- [ ] チーム間で共有スキーマレジストリ構築
- [ ] マイクロサービス 3+ が同じスキーマで開発
- [ ] スキーマ変更通知・バージョン管理プロセス構築
採用判断チェックリスト
- [ ] イベント駆動アーキテクチャ(3+ マイクロサービス)を構築している
- [ ] マイクロサービス間でイベント構造の不一致エラーが発生している
- [ ] 開発効率の向上(型安全・IDE 補完)を望んでいる
- [ ] API / スキーマのバージョン管理を一元化したい
まとめ
Amazon EventBridge Schema Registry は 「イベント駆動アーキテクチャのスキーマ管理・型安全開発を実現する統合プラットフォーム」。AWS サービスイベント・カスタムイベント・自動検出スキーマを一元管理し、Java / Python / TypeScript / Go の型安全なコードを自動生成。IDE 補完で開発効率を向上させ、スキーマバージョン管理で プロデューサー・コンシューマー間の契約違反を本番前に検知。
最小限の導入: 1 個のカスタムスキーマ + Python コード生成 = マイクロサービス型不一致エラー 80% 削減 フル活用: 複数スキーマ + 自動検出 + チーム間共有 = イベント駆動システムの信頼性・保守性 大幅向上
マイクロサービスが 3 個以上の組織では ほぼ必須。スキーマレジストリがイベントシステムの「Single Source of Truth」となり、組織全体の開発速度・品質向上に大きく貢献。
最終更新:2026-04-26 バージョン:v2.0