目次
EventBridge Pipes 完全ガイド v2.0
ポイントツーポイント統合・エンリッチメント・フィルタリング基盤
Amazon EventBridge Pipes は、「イベントソース(SQS / Kinesis / DynamoDB Streams / MSK / RabbitMQ)からターゲット(Lambda / Step Functions / SQS / SNS / EventBridge / HTTP エンドポイント等)へのポイントツーポイント統合をコードなしで実現するサービス」 であり、ソース→フィルター→エンリッチメント→ターゲットの 4 段階パイプラインで、イベント変換・加工・フィルタリング・付与情報の統合を宣言的に構成できます。Lambda Event Source Mapping に比べて柔軟なターゲット選択とビルトインエンリッチメントが強み。本ドキュメントは EventBridge Pipes の概念・アーキテクチャ・スタージ詳解・2025-2026 最新動向を体系的に解説する包括的ガイドです。
ドキュメント目的
本ガイドは以下を対象としています。
- 初心者向け: EventBridge Pipes とは何か、Lambda ESM との違いを学びたい方
- 統合開発者向け: イベント駆動のポイントツーポイント統合を実装したい方
- データエンジニア向け: Kafka / Kinesis のコンシューマー実装を簡素化したい方
- SRE / インフラ向け: コードなしのイベント変換・フィルタリング
- 意思決定者向け: Lambda ESM vs Pipes vs Step Functions の技術選定判断
2025-2026 年の EventBridge Pipes エコシステム
- 新ソース対応:DynamoDB Streams / Apache Kafka(セルフマネージド)への一般化
- エンリッチメント拡張:複数データソースからの並列エンリッチメント
- 変換能力強化:EventBridge Pipes Transform(CloudFormation テンプレート変換)の統合
- スケール最適化:キロイベント/秒レベルのスループット改善
- ビジュアル編集:AWS Console でのドラッグ&ドロップ Pipe 構築UI
- クロスアカウント配信:Organizations 経由での自動ルーティング
- AI による最適化推奨:メトリクスから自動的なフィルタリング・エンリッチメント提案
目次
- 概要
- EventBridge Pipes が解決する課題
- 主な特徴
- アーキテクチャ
- パイプの 4 つのステージ
- ソース詳解
- フィルタリング詳解
- エンリッチメント詳解
- ターゲット詳解
- 入力トランスフォーマー
- エラーハンドリング・DLQ
- 主要ユースケース
- 設定・操作の具体例
- CLI / SDK / IaC
- Lambda ESM との比較
- 類似ツール比較
- ベストプラクティス
- トラブルシューティング
- 2025-2026 最新動向
- 学習リソース・参考文献
- 実装例・活用シーン
- スケーリング・パフォーマンス
- セキュリティ・IAM
- 監視・ロギング
- チェックリスト
- まとめ
- 参考文献
概要
初心者向けメモ: EventBridge Pipes は「イベントを A から B に送り、途中で加工するサービス」です。従来は SQS から Lambda に送信する際にフィルタリングや変換ロジックを Lambda に書いていましたが、Pipes ではこうした処理をノーコードで構成でき、Lambda 実行コストを削減できます。
EventBridge Pipes 公式定義:
「イベントソースからターゲットへのポイントツーポイント統合。フィルタリング、変換、エンリッチメントをコードなしで実現」
EventBridge Pipes が解決する課題
課題 1:Lambda でフィルタリング・変換を実装すると定常コストが発生
❌ 従来の方法
# Lambda コード(実行時間が短くても課金発生)
def handler(event, context):
for record in event['Records']:
body = json.loads(record['body'])
if body['type'] != 'payment': # フィルタリング
continue
# 変換処理
transformed = {
'order_id': body['order_id'],
'amount': body['amount'],
'timestamp': datetime.now().isoformat()
}
# Step Functions へ送信
sfn.start_execution(
stateMachineArn=...,
input=json.dumps(transformed)
)
✅ Pipes 活用
{
"Source": "SQS",
"Filter": {"body": {"type": ["payment"]}},
"Enrichment": "arn:aws:states:::stepfunctions:startExecution",
"Target": "EventBridge"
}
課題 2:複数ソースからのコンシューマー実装が複雑
DynamoDB Streams・Kafka・RabbitMQ からイベント受信時に、各ソースごとにポーリングロジック・エラーハンドリング・スケーリング対応を実装する必要があり、複雑です。EventBridge Pipes で統一的に対応。
課題 3:イベント変換・エンリッチメントをアプリケーションロジックに混在
支払い情報の取得、顧客データの補充等を Lambda で実装すると、イベント駆動ロジックとビジネスロジックが混在。Pipes でビジネスロジック実装前に前処理を完結。
主な特徴
| 特徴 | 詳細 |
|---|---|
| 複数ソース対応 | SQS / Kinesis / DynamoDB Streams / MSK / RabbitMQ / 自己管理 Kafka |
| ポイントツーポイント | 1 ソース → 1 ターゲット(fan-out なし、シンプル) |
| 4 段階パイプライン | Source → Filter → Enrichment → Target |
| フィルタリング | EventBridge パターンマッチング(コード不要) |
| エンリッチメント | Lambda / Step Functions / API Gateway / API Destinations |
| 変換(トランスフォーマー) | 入力の構造化・JSON 書き換え |
| DLQ サポート | フィルター・エンリッチメント・ターゲット送信失敗時の Dead Letter Queue |
| 複数ターゲット | Lambda / Step Functions / SQS / SNS / Kinesis / EventBridge / HTTP / CloudWatch |
| コストメリット | Lambda より低コスト(イベント処理課金)、ESM より柔軟 |
| ノーコード | CloudFormation / CDK で IaC 対応可能 |
アーキテクチャ
graph LR
A["イベントソース"] --> B["EventBridge Pipes"]
subgraph PipelineStages["パイプラインステージ"]
B --> C["1. ソース<br/>バッチ取得"]
C --> D["2. フィルター<br/>条件チェック"]
D --> E["3. エンリッチメント<br/>追加情報取得"]
E --> F["4. ターゲット<br/>送信"]
end
A1["SQS<br/>Queue"] --> A
A2["Kinesis<br/>Stream"] --> A
A3["DynamoDB<br/>Streams"] --> A
A4["MSK / Kafka"] --> A
A5["RabbitMQ"] --> A
F --> T1["Lambda"]
F --> T2["Step Functions"]
F --> T3["SQS Queue"]
F --> T4["SNS Topic"]
F --> T5["Kinesis"]
F --> T6["EventBridge<br/>Bus"]
F --> T7["HTTP"]
F --> T8["CloudWatch"]
H["失敗"] --> DLQ["Dead Letter Queue<br/>SQS / SNS"]
F -.-> DLQ
style B fill:#e6ffe6
style PipelineStages fill:#fff3cd
パイプの 4 つのステージ
ステージ 1:Source(必須)
イベント取得元の設定。バッチ取得、ポーリング間隔、エラー処理を定義。
SQS ソース
{
"Source": "aws.sqs",
"BatchSize": 10,
"BatchingWindowInSeconds": 5,
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
}
}
Kinesis ソース
{
"Source": "aws.kinesis",
"BatchSize": 100,
"StartingPosition": "LATEST",
"BisectBatchOnFunctionError": true
}
DynamoDB Streams ソース
{
"Source": "aws.dynamodb",
"BatchSize": 100,
"StartingPosition": "LATEST",
"MaximumRetryAttempts": 2
}
MSK(Kafka)ソース
{
"Source": "aws.msk",
"ConsumerGroupName": "pipes-consumer",
"Credentials": {
"BasicAuthArn": "arn:aws:secretsmanager:..."
}
}
ステージ 2:Filter(オプション)
イベント内容に基づいて条件をチェック、条件を満たさないイベントは次ステージに進みません。
Filter パターンマッチング例
// DynamoDB Streams: INSERT イベントのみ通過
{
"eventName": ["INSERT"]
}
// SQS: body の type フィールドが payment のみ
{
"body": {
"type": ["payment"]
}
}
// SQS: 金額が 1000 以上のみ
{
"body": {
"amount": [{"numeric": [">", 1000]}]
}
}
// 複合条件:type が payment で amount > 1000
{
"body": {
"type": ["payment"],
"amount": [{"numeric": [">", 1000]}]
}
}
Filter の数値比較演算子
">": 大きい
"<": 小さい
">=": 以上
"<=": 以下
"=": 等しい
"!=": 等しくない
ステージ 3:Enrichment(オプション)
フィルタリングを通過したイベントに対して外部データを取得・追加。
Enrichment パターン
パターン 1:Lambda で変換・外部 API 呼び出し
{
"Enrichment": "arn:aws:lambda:ap-northeast-1:123456789012:function:enrich-payment",
"EnrichmentParameters": {
"HttpParameters": {
"HeaderParameters": {
"Authorization": "Bearer secret"
}
}
}
}
パターン 2:API Gateway / API Destinations で HTTP エンドポイント呼び出し
{
"Enrichment": "arn:aws:apigateway:ap-northeast-1::/restapis/.../resources/...",
"EnrichmentParameters": {
"HttpParameters": {
"PathParameterValues": ["order_id_from_event"]
}
}
}
パターン 3:Step Functions Express Workflow
{
"Enrichment": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:enrich-workflow",
"RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
}
Enrichment の入出力ベストプラクティス
# Lambda エンリッチメント例
def handler(event, context):
# event はパイプから受け取ったイベント
order_id = event['body']['order_id']
# 外部データベース・API から情報取得
customer_data = fetch_customer(order_id)
inventory = check_inventory(order_id)
# 拡張データを返す
return {
**event,
'customer': customer_data,
'inventory': inventory
}
ステージ 4:Target(必須)
エンリッチメント後(またはフィルター後)のイベントを送信。
Lambda をターゲット
{
"Target": "arn:aws:lambda:ap-northeast-1:123456789012:function:process-payment",
"RoleArn": "arn:aws:iam::123456789012:role/PipesRole",
"HttpParameters": {
"PathParameterValues": ["order_id"]
}
}
Step Functions をターゲット
{
"Target": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:order-workflow",
"RoleArn": "arn:aws:iam::123456789012:role/PipesRole",
"RolePath": "/pipes-targets/"
}
SQS / SNS をターゲット
{
"Target": "arn:aws:sqs:ap-northeast-1:123456789012:process-queue"
}
EventBridge Bus をターゲット
{
"Target": "arn:aws:events:ap-northeast-1:123456789012:event-bus/custom-bus"
}
ソース詳解
SQS ソース設定
aws pipes create-pipe \
--name "sqs-to-lambda-pipe" \
--source "arn:aws:sqs:ap-northeast-1:123456789012:source-queue" \
--source-parameters '{
"SqsQueueParameters": {
"BatchSize": 10,
"MaximumBatchingWindowInSeconds": 5,
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
}
}
}' \
--target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
Kinesis ソース設定
aws pipes create-pipe \
--name "kinesis-to-lambda-pipe" \
--source "arn:aws:kinesis:ap-northeast-1:123456789012:stream/event-stream" \
--source-parameters '{
"KinesisStreamParameters": {
"BatchSize": 100,
"StartingPosition": "LATEST",
"DeadLetterConfig": {
"Arn": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/dlq"
}
}
}' \
--target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
DynamoDB Streams ソース設定
aws pipes create-pipe \
--name "dynamodb-to-sfn-pipe" \
--source "arn:aws:dynamodb:ap-northeast-1:123456789012:table/orders/stream/2024-04-26T..." \
--source-parameters '{
"DynamoDBStreamParameters": {
"BatchSize": 100,
"StartingPosition": "LATEST",
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
}
}
}' \
--target "arn:aws:states:ap-northeast-1:123456789012:stateMachine:process-order" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
MSK(Kafka)ソース設定
aws pipes create-pipe \
--name "kafka-to-lambda-pipe" \
--source "arn:aws:kafka:ap-northeast-1:123456789012:cluster/msk-cluster/..." \
--source-parameters '{
"MskParameters": {
"TopicName": "payment-events",
"ConsumerGroupName": "pipes-consumer",
"Credentials": {
"BasicAuthArn": "arn:aws:secretsmanager:ap-northeast-1:123456789012:secret:kafka-credentials"
},
"BatchSize": 100
}
}' \
--target "arn:aws:lambda:ap-northeast-1:123456789012:function:process-payment" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
フィルタリング詳解
Filter 設定方法
aws pipes create-pipe \
--name "filtered-pipe" \
--source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
--filter-pattern '{
"body": {
"type": ["payment"],
"amount": [{"numeric": [">", 100]}],
"status": ["pending"]
}
}' \
--target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
Filter パターンの詳細例
// 複合ロジック:type=payment OR type=refund
{
"body": {
"type": ["payment", "refund"]
}
}
// 数値範囲:amount が 100 以上 10000 未満
{
"body": {
"amount": [
{"numeric": [">=", 100]},
{"numeric": ["<", 10000]}
]
}
}
// 文字列プリフィックス:order_id が "ORD-" で始まる
{
"body": {
"order_id": [{"prefix": "ORD-"}]
}
}
// 存在チェック:customer_id フィールドが存在する
{
"body": {
"customer_id": [{"exists": true}]
}
}
エンリッチメント詳解
Enrichment パターン 1:Lambda で変換
# Lambda 関数(enrich-payment)
import boto3
import json
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('customers')
def handler(event, context):
# イベントから顧客 ID を取得
order_id = event['body']['order_id']
customer_id = event['body']['customer_id']
# DynamoDB から顧客情報を取得
response = table.get_item(Key={'customer_id': customer_id})
customer = response.get('Item', {})
# イベントに情報を追加
enriched = {
**event,
'customer_name': customer.get('name'),
'customer_email': customer.get('email'),
'loyalty_points': customer.get('loyalty_points', 0),
'enriched_at': datetime.now().isoformat()
}
return enriched
Enrichment パターン 2:HTTP エンドポイント(API Gateway)
aws pipes create-pipe \
--name "http-enrichment-pipe" \
--source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
--enrichment "arn:aws:apigateway:ap-northeast-1::/restapis/abc123/resources/..." \
--enrichment-parameters '{
"HttpParameters": {
"HeaderParameters": {
"Authorization": "Bearer token"
},
"QueryStringParameters": {
"customer_id": "$.body.customer_id"
}
},
"RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
}' \
--target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
Enrichment パターン 3:Step Functions Express Workflow
{
"Comment": "支払い情報をエンリッチするワークフロー",
"StartAt": "FetchCustomer",
"States": {
"FetchCustomer": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "customers",
"Key": {
"customer_id": {
"S.{{CONTENT}}quot;: "$.body.customer_id"
}
}
},
"Next": "CheckInventory"
},
"CheckInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:check-inventory",
"Next": "Success"
},
"Success": {
"Type": "Succeed"
}
}
}
ターゲット詳解
Lambda ターゲット
aws pipes create-pipe \
--name "sqs-to-lambda-pipe" \
--source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
--target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
--target-parameters '{
"LambdaFunctionParameters": {
"InvocationType": "RequestResponse"
},
"RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
}'
Step Functions ターゲット
aws pipes create-pipe \
--name "event-to-sfn-pipe" \
--source "arn:aws:events:ap-northeast-1:123456789012:event-bus/default" \
--target "arn:aws:states:ap-northeast-1:123456789012:stateMachine:OrderProcessing" \
--target-parameters '{
"StepFunctionStateMachineParameters": {
"InvocationType": "FIRE_AND_FORGET"
},
"RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
}'
SQS ターゲット
aws pipes create-pipe \
--name "dlq-to-queue-pipe" \
--source "arn:aws:sqs:ap-northeast-1:123456789012:dlq" \
--target "arn:aws:sqs:ap-northeast-1:123456789012:retry-queue" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
EventBridge Bus ターゲット
aws pipes create-pipe \
--name "stream-to-eventbus-pipe" \
--source "arn:aws:kinesis:ap-northeast-1:123456789012:stream/events" \
--target "arn:aws:events:ap-northeast-1:123456789012:event-bus/custom-bus" \
--target-parameters '{
"EventBridgeParameters": {
"DetailType": "Payment Event",
"Source": "custom.pipes"
},
"RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
}'
入力トランスフォーマー
Pipes には別のトランスフォーマー機能があり、イベント構造を再構築可能です。
aws pipes create-pipe \
--name "transform-pipe" \
--source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
--target-parameters '{
"HttpParameters": {
"PathParameterValues": [],
"HeaderParameters": {
"X-Custom-Header": "value"
},
"QueryStringParameters": {
"order_id": "$.body.order_id",
"customer_id": "$.body.customer_id"
}
}
}'
エラーハンドリング・DLQ
DLQ 設定
# SQS ソースの DLQ
aws pipes create-pipe \
--name "pipe-with-dlq" \
--source "arn:aws:sqs:ap-northeast-1:123456789012:source" \
--source-parameters '{
"SqsQueueParameters": {
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:ap-northeast-1:123456789012:pipe-dlq"
}
}
}' \
--target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
Retry ポリシー
{
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
},
"RetryPolicy": {
"MaximumEventAge": 3600,
"MaximumRetryAttempts": 2
}
}
主要ユースケース
ユースケース 1:リアルタイム注文処理
- DynamoDB Streams(注文作成イベント)
- → Filter: status=pending
- → Enrichment: 顧客・在庫情報取得(Lambda)
- → Target: Step Functions ワークフロー(支払い処理開始)
ユースケース 2:ログアグリゲーション&フィルタリング
- CloudWatch Logs(Application ログ)
- → Filter: ERROR / CRITICAL のみ
- → Enrichment: コンテキスト追加(API Gateway)
- → Target: SNS(アラート)+ DataDog
ユースケース 3:Kafka をソースに、複数ターゲットへ
Kafka(MSK)トピック
→ Filter: event_type=payment
→ Enrichment: なし
→ Target-1: Lambda(リアルタイム処理)
→ Target-2: Kinesis(分析パイプライン)
# Pipes は 1 パイプ = 1 ターゲットなので複数パイプ必要
ユースケース 4:DynamoDB Streams → SaaS 連携
- DynamoDB Streams
- → Filter: table=contacts AND operation=INSERT
- → Enrichment: Salesforce API で顧客レコード作成(HTTP)
- → Target: EventBridge(内部イベント通知)
設定・操作の具体例
パイプ作成(完全例)
#!/bin/bash
PIPE_NAME="payment-processing-pipe"
SOURCE_ARN="arn:aws:sqs:ap-northeast-1:123456789012:payment-queue"
ENRICHMENT_ARN="arn:aws:lambda:ap-northeast-1:123456789012:function:enrich-payment"
TARGET_ARN="arn:aws:states:ap-northeast-1:123456789012:stateMachine:ProcessPayment"
ROLE_ARN="arn:aws:iam::123456789012:role/PipesExecutionRole"
aws pipes create-pipe \
--name "$PIPE_NAME" \
--source "$SOURCE_ARN" \
--source-parameters '{
"SqsQueueParameters": {
"BatchSize": 10,
"MaximumBatchingWindowInSeconds": 5,
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:ap-northeast-1:123456789012:payment-dlq"
}
}
}' \
--filter-pattern '{
"body": {
"type": ["payment"],
"amount": [{"numeric": [">", 100]}]
}
}' \
--enrichment "$ENRICHMENT_ARN" \
--enrichment-parameters '{
"LambdaFunctionParameters": {
"InvocationType": "RequestResponse"
},
"RoleArn": "'$ROLE_ARN'"
}' \
--target "$TARGET_ARN" \
--target-parameters '{
"StepFunctionStateMachineParameters": {
"InvocationType": "FIRE_AND_FORGET"
},
"RoleArn": "'$ROLE_ARN'"
}'
echo "Pipe created: $PIPE_NAME"
パイプ確認
# パイプ一覧表示
aws pipes list-pipes
# パイプ詳細確認
aws pipes describe-pipe --name payment-processing-pipe
# パイプの実行状態確認
aws pipes describe-pipe-source-parameters --name payment-processing-pipe
パイプ更新
aws pipes update-pipe \
--name payment-processing-pipe \
--source-parameters '{
"SqsQueueParameters": {
"BatchSize": 20
}
}'
パイプ削除
aws pipes delete-pipe --name payment-processing-pipe
CLI / SDK / IaC
AWS CDK での定義
from aws_cdk import (
core,
pipes,
sqs,
lambda_,
stepfunctions
)
class PaymentPipesStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# ソース(SQS)
source_queue = sqs.Queue(self, "PaymentQueue")
dlq = sqs.Queue(self, "PaymentDLQ")
# エンリッチメント(Lambda)
enrich_function = lambda_.Function(
self, "EnrichPayment",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="index.handler",
code=lambda_.Code.from_asset("lambda")
)
# ターゲット(Step Functions)
sfn_role = iam.Role(...)
state_machine = stepfunctions.StateMachine(...)
# Pipe 作成
pipe = pipes.Pipe(
self, "PaymentPipe",
source=pipes.PipeSource.from_sqs_queue(source_queue),
source_parameters=pipes.SqsSourceParameters(
batch_size=10,
dead_letter_config=pipes.DeadLetterConfig(
arn=dlq.queue_arn
)
),
filter_pattern=pipes.FilterPattern.from_json({
"body": {
"type": ["payment"],
"amount": [{"numeric": [">", 100]}]
}
}),
enrichment=pipes.Enrichment.from_lambda_function(enrich_function),
target=pipes.PipeTarget.from_step_functions_state_machine(state_machine),
role=iam.Role(...)
)
Terraform での定義
resource "aws_pipes_pipe" "payment_pipe" {
name = "payment-processing-pipe"
role_arn = aws_iam_role.pipes_role.arn
source = "arn:aws:sqs:ap-northeast-1:123456789012:payment-queue"
source_parameters = jsonencode({
SqsQueueParameters = {
BatchSize = 10
MaximumBatchingWindowInSeconds = 5
DeadLetterConfig = {
Arn = aws_sqs_queue.dlq.arn
}
}
})
filter = jsonencode({
body = {
type = ["payment"]
amount = [{ numeric = [">", 100] }]
}
})
enrichment = aws_lambda_function.enrich.arn
enrichment_parameters = jsonencode({
LambdaFunctionParameters = {
InvocationType = "RequestResponse"
}
RoleArn = aws_iam_role.pipes_role.arn
})
target = aws_sfn_state_machine.process.arn
target_parameters = jsonencode({
StepFunctionStateMachineParameters = {
InvocationType = "FIRE_AND_FORGET"
}
RoleArn = aws_iam_role.pipes_role.arn
})
}
CloudFormation での定義
Resources:
PaymentProcessingPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: payment-processing-pipe
RoleArn: !GetAtt PipesRole.Arn
Source: !GetAtt PaymentQueue.Arn
SourceParameters:
SqsQueueParameters:
BatchSize: 10
MaximumBatchingWindowInSeconds: 5
DeadLetterConfig:
Arn: !GetAtt PaymentDLQ.Arn
Filter: |
{
"body": {
"type": ["payment"],
"amount": [{"numeric": [">", 100]}]
}
}
Enrichment: !GetAtt EnrichPaymentFunction.Arn
EnrichmentParameters:
LambdaFunctionParameters:
InvocationType: RequestResponse
RoleArn: !GetAtt PipesRole.Arn
Target: !GetAtt ProcessPaymentStateMachine.Arn
TargetParameters:
StepFunctionStateMachineParameters:
InvocationType: FIRE_AND_FORGET
RoleArn: !GetAtt PipesRole.Arn
Lambda ESM との比較
| 観点 | Lambda ESM | EventBridge Pipes |
|---|---|---|
| 主な用途 | Lambda 関数へのイベント配信 | ポイントツーポイント統合 |
| ターゲット | Lambda のみ | Lambda / Step Functions / SQS / SNS / Kinesis / HTTP 等 |
| フィルタリング | Lambda コード内 | ビジュアル / JSON パターン |
| エンリッチメント | Lambda で実装 | ビルトイン(Lambda / API / Step Functions) |
| 変換 | Lambda で実装 | Input Transformer で可能 |
| 複数ターゲット | ❌不可 | ❌1 パイプ = 1 ターゲット(複数パイプ必要) |
| コスト | Lambda 実行料金 | $0.40/100 万イベント + ターゲット料金 |
| コード記述 | ✅必須(フィルタ・変換ロジック) | ❌不要(ノーコード) |
| スケーリング | 自動(Lambda) | 自動(Pipes) |
| DLQ | ✅対応 | ✅対応 |
| 採用シーン | SQS/Kinesis → Lambda | SQS/Kinesis/DynamoDB Streams → 複数ターゲット |
類似ツール比較
| ツール | 対象 | 用途 | 特徴 |
|---|---|---|---|
| Lambda ESM | SQS/Kinesis → Lambda | イベント駆動のサーバーレス処理 | AWS ネイティブ、信頼性高 |
| EventBridge Pipes | 複数ソース → 複数ターゲット | ポイントツーポイント統合 | ノーコード、柔軟ターゲット |
| Apache Camel | OSS 統合フレームワーク | EIP(Enterprise Integration Patterns)実装 | 複雑な変換・ルーティング |
| Kafka Connect | Kafka ↔ 外部システム | Kafka → RDB / S3 等 | Kafka エコシステム専用 |
| Step Functions | AWS サービス統合 | 複雑なワークフロー | 状態管理・エラーハンドリング |
| AWS Glue | バッチ ETL | データ変換・クレンジング | スケジュールベース処理 |
ベストプラクティス
✅ やるべきこと
-
Filter を使用して不要イベントを早期除外
// キューからの全イベントを Lambda に送信するのではなく // フィルターで対象イベントのみ通過させ、課金を削減 { "body": { "type": ["payment"], "status": ["pending"] } } -
エンリッチメント は同期的に実行(Lambda)
# エンリッチメント Lambda は RequestResponse(同期)で # ターゲットに確実にデータを届ける -
DLQ を必ず設定
# ソース・ターゲットの失敗イベントを DLQ に記録 # 後続の再処理・デバッグに活用 -
CloudWatch Metrics で監視
# InvocationCount(実行数) # DeadLetterDeliveries(失敗数) # Duration(実行時間) -
複数ターゲットが必要なら複数パイプを構築
# 1 パイプ = 1 ターゲットなので # fan-out が必要なら Pipe → EventBridge Bus → 複数ルール
❌ やってはいけないこと
-
エンリッチメント なしで複雑なロジック実装
# ターゲット Lambda で全変換ロジック → Pipes の利点喪失 # Enrichment で前処理、Target で処理に役割分担 -
Filter を使わずに全イベント処理
# キューの全イベント → Target で課金発生 # 不要なイベントはフィルターで除外して コスト削減 -
エンリッチメント Lambda を FireAndForget(非同期)実行
# 同期実行(RequestResponse)で確実に データ受け取り # 非同期だとエンリッチメント結果を得られない -
DLQ なしでのデプロイ
# エラー追跡ができず、デバッグが困難に # 本番投入前に DLQ 必須 -
複数エンリッチメント を串刺し
# Enrichment → Target → Lambda(別エンリッチメント) # → この場合は Step Functions Enrichment で複数ステップ実装
トラブルシューティング
Issue 1:Pipe がイベントを処理していない
# 原因確認
1. Pipe が RUNNING 状態か
aws pipes describe-pipe --name my-pipe --query 'State'
2. ソースにイベントが存在するか
aws sqs get-queue-attributes \
--queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/source \
--attribute-names ApproximateNumberOfMessages
3. Filter パターンが正しいか
# Filter が厳しすぎてイベントが全て除外されていないか
4. IAM ロール権限が不足していないか
# arn:aws:iam::123456789012:role/PipesRole に必要な権限があるか
Issue 2:エンリッチメント が失敗している
# 原因確認
1. Enrichment Lambda が存在・実行可能か
aws lambda get-function --function-name enrich-payment
2. Lambda IAM 実行ロールが正しいか
aws iam get-role --role-name LambdaExecutionRole
3. Lambda 関数がエラーを返していないか
aws logs tail /aws/lambda/enrich-payment --follow
4. Enrichment の InvocationType が RequestResponse か
# FireAndForget だと結果が取得できない
Issue 3:ターゲット送信が失敗している
# 原因確認
1. ターゲットリソースが存在・アクセス可能か
aws lambda get-function --function-name target-function
2. Pipes ロールがターゲットに invoke 権限を持っているか
# ターゲット (Lambda / Step Functions) への IAM PermissionPolicy 確認
3. CloudWatch ダッシュボードで失敗数を確認
aws cloudwatch get-metric-statistics \
--namespace AWS/Pipes \
--metric-name DeadLetterDeliveries \
--dimensions Name=PipeName,Value=my-pipe \
--start-time 2024-01-01T00:00:00Z \
--end-time 2024-01-02T00:00:00Z \
--period 3600 \
--statistics Sum
Issue 4:DLQ にイベントが溜まっている
# DLQ メッセージの確認
aws sqs receive-message \
--queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/pipe-dlq \
--max-number-of-messages 10
# メッセージ本体を確認
# DLQ メッセージには元々のイベント + エラー情報が含まれている
2025-2026 最新動向
1. Transform Stage の追加(2025)
フィルター→エンリッチメント→ターゲット に加え、Transform ステージで入力/出力の構造を再構築可能に。
aws pipes create-pipe \
--name "transform-pipe" \
--source ... \
--enrichment ... \
--transform-parameters '{
"InputPath": "$.body",
"OutputPath": "$.enriched"
}' \
--target ...
2. マルチターゲット Pipe(計画中)
1 Pipe で複数ターゲットをサポート(現在は 1 Pipe = 1 ターゲット制限を緩和予定)
3. ビジュアル Pipe Editor
AWS Console でドラッグ&ドロップでパイプ構築可能に(CloudFormation 自動生成)
4. Serverless Apache Kafka 統合
サーバーレス Kafka(AWS Lambda と統合) を Pipes ソースに対応
5. AI-Driven エンリッチメント推奨
CloudWatch メトリクスから自動的にエンリッチメント必要性を判定・提案
学習リソース・参考文献
AWS 公式資料
- Amazon EventBridge Pipes User Guide
- EventBridge Pipes - Enrichment Documentation
- Pipes API Reference
AWS ブログ・動画
- Enriching and customizing notifications with Amazon EventBridge Pipes
- Point-to-Point Integration with EventBridge Pipes
サードパーティ・OSS
実装例・活用シーン
シーン 1:E-Commerce 支払い処理
- SQS Queue(支払いリクエスト)
- → Filter: type=payment & amount>100
- → Enrichment: Lambda で顧客・住所情報取得
- → Target: Step Functions(支払い処理ワークフロー)
シーン 2:DynamoDB Streams → 複数システム連携
DynamoDB Streams(顧客マスター更新)
→ Filter: operation=INSERT or MODIFY
→ Enrichment: なし
→ Target: Pipe-1: Lambda(Elasticsearch 更新)
→ Target: Pipe-2: HTTP(SaaS CRM 同期)
→ Target: Pipe-3: SNS(社内通知)
シーン 3:Kafka トピック → リアルタイムダッシュボード
- MSK Topic(イベントストリーム)
- → Filter: event_type=user_action
- → Enrichment: API Gateway(追加メタデータ取得)
- → Target: Kinesis(リアルタイム分析)
スケーリング・パフォーマンス
スループット特性
| メトリクス | 値 | 備考 |
|---|---|---|
| 最大スループット | 100,000+ イベント/秒 | 自動スケーリング |
| バッチサイズ | 1~10,000 | ソース、ターゲットで設定可能 |
| レイテンシ | < 1 秒 | 典型的な構成での P99 |
| エンリッチメント時間 | + 100~500ms | Lambda / API Gateway の実行時間に依存 |
パフォーマンス最適化のコツ
-
バッチサイズを適切に設定
# SQS:10~100 が推奨 # Kinesis:100~1000 が推奨 # 大きいほどレイテンシ増加、スループット向上 -
エンリッチメント を最小化
# 複数エンリッチメントが必要なら # → Step Functions Express で複数ステップ実装 -
フィルター で不要イベント早期除外
# ターゲット到達前に除外 → レイテンシ削減
セキュリティ・IAM
IAM ロール必須権限
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-northeast-1:123456789012:source"
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:*"
},
{
"Effect": "Allow",
"Action": [
"states:StartExecution"
],
"Resource": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:*"
},
{
"Effect": "Allow",
"Action": [
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:ap-northeast-1:123456789012:target"
}
]
}
監視・ロギング
CloudWatch メトリクス
# Pipe の実行数
aws cloudwatch get-metric-statistics \
--namespace AWS/Pipes \
--metric-name InvocationCount \
--dimensions Name=PipeName,Value=my-pipe \
--period 300 \
--statistics Sum
# DLQ 配信数(失敗)
aws cloudwatch get-metric-statistics \
--namespace AWS/Pipes \
--metric-name DeadLetterDeliveries \
--dimensions Name=PipeName,Value=my-pipe \
--period 300 \
--statistics Sum
# パイプ実行時間
aws cloudwatch get-metric-statistics \
--namespace AWS/Pipes \
--metric-name Duration \
--dimensions Name=PipeName,Value=my-pipe \
--period 300 \
--statistics Average
CloudWatch Logs
# Pipe ログを CloudWatch Logs に記録
# LogLevel: ERROR / INFO / DEBUG 設定可能
チェックリスト
- [ ] ソース(SQS / Kinesis / DynamoDB Streams 等)が正しく指定されているか
- [ ] バッチサイズが適切か(SQS: 10~100、Kinesis: 100~1000)
- [ ] Filter パターンが正確か(テスト済み)
- [ ] Enrichment が必要か、必要なら実装済みか
- [ ] ターゲットが存在・アクセス可能か
- [ ] IAM ロール権限が揃っているか
- [ ] DLQ が設定されているか
- [ ] CloudWatch ダッシュボード・アラームが設定されているか
- [ ] 非本番環境でテスト済みか
- [ ] パイプの実行状態が RUNNING か
- [ ] エラー率が許容範囲か(CloudWatch メトリクス確認)
まとめ
EventBridge Pipes は 「イベントソースとターゲット間のポイントツーポイント統合を、フィルタリング・エンリッチメント・変換をビルトインで実現するノーコードサービス」。SQS / Kinesis / DynamoDB Streams / MSK などの複数ソースから Lambda / Step Functions / SQS / SNS / EventBridge / HTTP エンドポイント等への統合を、コード記述不要で構成可能です。
採用メリット:
- コードなしのフィルタリング・エンリッチメント
- 複数ターゲット対応(複数パイプで fan-out)
- Lambda ESM より低コスト(イベント課金)かつ柔軟
- CloudFormation / CDK で IaC 対応可能
- DLQ・自動リトライ で信頼性向上
設計のコツ:
- フィルター で不要イベント早期除外(コスト削減)
- エンリッチメント は前処理専用(ビジネスロジックはターゲットで)
- DLQ を必ず設定(エラー追跡)
- CloudWatch メトリクス で継続監視
- 複数ターゲット必要なら → Pipe → EventBridge Bus → 複数ルール
EventBridge Pipes は、イベント駆動アーキテクチャにおける統合レイヤーのベストプラクティス実装。コスト削減・開発速度向上・保守性改善を同時に実現します。
最終更新:2026-04-26
バージョン:v2.0