目次

EventBridge Pipes 完全ガイド v2.0

ポイントツーポイント統合・エンリッチメント・フィルタリング基盤

Amazon EventBridge Pipes は、「イベントソース(SQS / Kinesis / DynamoDB Streams / MSK / RabbitMQ)からターゲット(Lambda / Step Functions / SQS / SNS / EventBridge / HTTP エンドポイント等)へのポイントツーポイント統合をコードなしで実現するサービス」 であり、ソース→フィルター→エンリッチメント→ターゲットの 4 段階パイプラインで、イベント変換・加工・フィルタリング・付与情報の統合を宣言的に構成できます。Lambda Event Source Mapping に比べて柔軟なターゲット選択とビルトインエンリッチメントが強み。本ドキュメントは EventBridge Pipes の概念・アーキテクチャ・スタージ詳解・2025-2026 最新動向を体系的に解説する包括的ガイドです。

ドキュメント目的

本ガイドは以下を対象としています。

  • 初心者向け: EventBridge Pipes とは何か、Lambda ESM との違いを学びたい方
  • 統合開発者向け: イベント駆動のポイントツーポイント統合を実装したい方
  • データエンジニア向け: Kafka / Kinesis のコンシューマー実装を簡素化したい方
  • SRE / インフラ向け: コードなしのイベント変換・フィルタリング
  • 意思決定者向け: Lambda ESM vs Pipes vs Step Functions の技術選定判断

2025-2026 年の EventBridge Pipes エコシステム

  • 新ソース対応:DynamoDB Streams / Apache Kafka(セルフマネージド)への一般化
  • エンリッチメント拡張:複数データソースからの並列エンリッチメント
  • 変換能力強化:EventBridge Pipes Transform(CloudFormation テンプレート変換)の統合
  • スケール最適化:キロイベント/秒レベルのスループット改善
  • ビジュアル編集:AWS Console でのドラッグ&ドロップ Pipe 構築UI
  • クロスアカウント配信:Organizations 経由での自動ルーティング
  • AI による最適化推奨:メトリクスから自動的なフィルタリング・エンリッチメント提案

目次

  1. 概要
  2. EventBridge Pipes が解決する課題
  3. 主な特徴
  4. アーキテクチャ
  5. パイプの 4 つのステージ
  6. ソース詳解
  7. フィルタリング詳解
  8. エンリッチメント詳解
  9. ターゲット詳解
  10. 入力トランスフォーマー
  11. エラーハンドリング・DLQ
  12. 主要ユースケース
  13. 設定・操作の具体例
  14. CLI / SDK / IaC
  15. Lambda ESM との比較
  16. 類似ツール比較
  17. ベストプラクティス
  18. トラブルシューティング
  19. 2025-2026 最新動向
  20. 学習リソース・参考文献
  21. 実装例・活用シーン
  22. スケーリング・パフォーマンス
  23. セキュリティ・IAM
  24. 監視・ロギング
  25. チェックリスト
  26. まとめ
  27. 参考文献

概要

初心者向けメモ: EventBridge Pipes は「イベントを A から B に送り、途中で加工するサービス」です。従来は SQS から Lambda に送信する際にフィルタリングや変換ロジックを Lambda に書いていましたが、Pipes ではこうした処理をノーコードで構成でき、Lambda 実行コストを削減できます。

EventBridge Pipes 公式定義:

「イベントソースからターゲットへのポイントツーポイント統合。フィルタリング、変換、エンリッチメントをコードなしで実現」

参照:Amazon EventBridge Pipes


EventBridge Pipes が解決する課題

課題 1:Lambda でフィルタリング・変換を実装すると定常コストが発生

従来の方法

# Lambda コード(実行時間が短くても課金発生)
def handler(event, context):
    for record in event['Records']:
        body = json.loads(record['body'])
        if body['type'] != 'payment':  # フィルタリング
            continue
        
        # 変換処理
        transformed = {
            'order_id': body['order_id'],
            'amount': body['amount'],
            'timestamp': datetime.now().isoformat()
        }
        # Step Functions へ送信
        sfn.start_execution(
            stateMachineArn=...,
            input=json.dumps(transformed)
        )

Pipes 活用

{
  "Source": "SQS",
  "Filter": {"body": {"type": ["payment"]}},
  "Enrichment": "arn:aws:states:::stepfunctions:startExecution",
  "Target": "EventBridge"
}

課題 2:複数ソースからのコンシューマー実装が複雑

DynamoDB Streams・Kafka・RabbitMQ からイベント受信時に、各ソースごとにポーリングロジック・エラーハンドリング・スケーリング対応を実装する必要があり、複雑です。EventBridge Pipes で統一的に対応。

課題 3:イベント変換・エンリッチメントをアプリケーションロジックに混在

支払い情報の取得、顧客データの補充等を Lambda で実装すると、イベント駆動ロジックとビジネスロジックが混在。Pipes でビジネスロジック実装前に前処理を完結。


主な特徴

特徴 詳細
複数ソース対応 SQS / Kinesis / DynamoDB Streams / MSK / RabbitMQ / 自己管理 Kafka
ポイントツーポイント 1 ソース → 1 ターゲット(fan-out なし、シンプル)
4 段階パイプライン Source → Filter → Enrichment → Target
フィルタリング EventBridge パターンマッチング(コード不要)
エンリッチメント Lambda / Step Functions / API Gateway / API Destinations
変換(トランスフォーマー) 入力の構造化・JSON 書き換え
DLQ サポート フィルター・エンリッチメント・ターゲット送信失敗時の Dead Letter Queue
複数ターゲット Lambda / Step Functions / SQS / SNS / Kinesis / EventBridge / HTTP / CloudWatch
コストメリット Lambda より低コスト(イベント処理課金)、ESM より柔軟
ノーコード CloudFormation / CDK で IaC 対応可能

アーキテクチャ

graph LR
    A["イベントソース"] --> B["EventBridge Pipes"]
    
    subgraph PipelineStages["パイプラインステージ"]
        B --> C["1. ソース<br/>バッチ取得"]
        C --> D["2. フィルター<br/>条件チェック"]
        D --> E["3. エンリッチメント<br/>追加情報取得"]
        E --> F["4. ターゲット<br/>送信"]
    end
    
    A1["SQS<br/>Queue"] --> A
    A2["Kinesis<br/>Stream"] --> A
    A3["DynamoDB<br/>Streams"] --> A
    A4["MSK / Kafka"] --> A
    A5["RabbitMQ"] --> A
    
    F --> T1["Lambda"]
    F --> T2["Step Functions"]
    F --> T3["SQS Queue"]
    F --> T4["SNS Topic"]
    F --> T5["Kinesis"]
    F --> T6["EventBridge<br/>Bus"]
    F --> T7["HTTP"]
    F --> T8["CloudWatch"]
    
    H["失敗"] --> DLQ["Dead Letter Queue<br/>SQS / SNS"]
    F -.-> DLQ
    
    style B fill:#e6ffe6
    style PipelineStages fill:#fff3cd

パイプの 4 つのステージ

ステージ 1:Source(必須)

イベント取得元の設定。バッチ取得、ポーリング間隔、エラー処理を定義。

SQS ソース

{
  "Source": "aws.sqs",
  "BatchSize": 10,
  "BatchingWindowInSeconds": 5,
  "DeadLetterConfig": {
    "Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
  }
}

Kinesis ソース

{
  "Source": "aws.kinesis",
  "BatchSize": 100,
  "StartingPosition": "LATEST",
  "BisectBatchOnFunctionError": true
}

DynamoDB Streams ソース

{
  "Source": "aws.dynamodb",
  "BatchSize": 100,
  "StartingPosition": "LATEST",
  "MaximumRetryAttempts": 2
}

MSK(Kafka)ソース

{
  "Source": "aws.msk",
  "ConsumerGroupName": "pipes-consumer",
  "Credentials": {
    "BasicAuthArn": "arn:aws:secretsmanager:..."
  }
}

ステージ 2:Filter(オプション)

イベント内容に基づいて条件をチェック、条件を満たさないイベントは次ステージに進みません。

Filter パターンマッチング例

// DynamoDB Streams: INSERT イベントのみ通過
{
  "eventName": ["INSERT"]
}

// SQS: body の type フィールドが payment のみ
{
  "body": {
    "type": ["payment"]
  }
}

// SQS: 金額が 1000 以上のみ
{
  "body": {
    "amount": [{"numeric": [">", 1000]}]
  }
}

// 複合条件:type が payment で amount > 1000
{
  "body": {
    "type": ["payment"],
    "amount": [{"numeric": [">", 1000]}]
  }
}

Filter の数値比較演算子

">": 大きい
"<": 小さい
">=": 以上
"<=": 以下
"=": 等しい
"!=": 等しくない

ステージ 3:Enrichment(オプション)

フィルタリングを通過したイベントに対して外部データを取得・追加。

Enrichment パターン

パターン 1:Lambda で変換・外部 API 呼び出し

{
  "Enrichment": "arn:aws:lambda:ap-northeast-1:123456789012:function:enrich-payment",
  "EnrichmentParameters": {
    "HttpParameters": {
      "HeaderParameters": {
        "Authorization": "Bearer secret"
      }
    }
  }
}

パターン 2:API Gateway / API Destinations で HTTP エンドポイント呼び出し

{
  "Enrichment": "arn:aws:apigateway:ap-northeast-1::/restapis/.../resources/...",
  "EnrichmentParameters": {
    "HttpParameters": {
      "PathParameterValues": ["order_id_from_event"]
    }
  }
}

パターン 3:Step Functions Express Workflow

{
  "Enrichment": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:enrich-workflow",
  "RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
}

Enrichment の入出力ベストプラクティス

# Lambda エンリッチメント例
def handler(event, context):
    # event はパイプから受け取ったイベント
    order_id = event['body']['order_id']
    
    # 外部データベース・API から情報取得
    customer_data = fetch_customer(order_id)
    inventory = check_inventory(order_id)
    
    # 拡張データを返す
    return {
        **event,
        'customer': customer_data,
        'inventory': inventory
    }

ステージ 4:Target(必須)

エンリッチメント後(またはフィルター後)のイベントを送信。

Lambda をターゲット

{
  "Target": "arn:aws:lambda:ap-northeast-1:123456789012:function:process-payment",
  "RoleArn": "arn:aws:iam::123456789012:role/PipesRole",
  "HttpParameters": {
    "PathParameterValues": ["order_id"]
  }
}

Step Functions をターゲット

{
  "Target": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:order-workflow",
  "RoleArn": "arn:aws:iam::123456789012:role/PipesRole",
  "RolePath": "/pipes-targets/"
}

SQS / SNS をターゲット

{
  "Target": "arn:aws:sqs:ap-northeast-1:123456789012:process-queue"
}

EventBridge Bus をターゲット

{
  "Target": "arn:aws:events:ap-northeast-1:123456789012:event-bus/custom-bus"
}

ソース詳解

SQS ソース設定

aws pipes create-pipe \
  --name "sqs-to-lambda-pipe" \
  --source "arn:aws:sqs:ap-northeast-1:123456789012:source-queue" \
  --source-parameters '{
    "SqsQueueParameters": {
      "BatchSize": 10,
      "MaximumBatchingWindowInSeconds": 5,
      "DeadLetterConfig": {
        "Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
      }
    }
  }' \
  --target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

Kinesis ソース設定

aws pipes create-pipe \
  --name "kinesis-to-lambda-pipe" \
  --source "arn:aws:kinesis:ap-northeast-1:123456789012:stream/event-stream" \
  --source-parameters '{
    "KinesisStreamParameters": {
      "BatchSize": 100,
      "StartingPosition": "LATEST",
      "DeadLetterConfig": {
        "Arn": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/dlq"
      }
    }
  }' \
  --target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

DynamoDB Streams ソース設定

aws pipes create-pipe \
  --name "dynamodb-to-sfn-pipe" \
  --source "arn:aws:dynamodb:ap-northeast-1:123456789012:table/orders/stream/2024-04-26T..." \
  --source-parameters '{
    "DynamoDBStreamParameters": {
      "BatchSize": 100,
      "StartingPosition": "LATEST",
      "DeadLetterConfig": {
        "Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
      }
    }
  }' \
  --target "arn:aws:states:ap-northeast-1:123456789012:stateMachine:process-order" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

MSK(Kafka)ソース設定

aws pipes create-pipe \
  --name "kafka-to-lambda-pipe" \
  --source "arn:aws:kafka:ap-northeast-1:123456789012:cluster/msk-cluster/..." \
  --source-parameters '{
    "MskParameters": {
      "TopicName": "payment-events",
      "ConsumerGroupName": "pipes-consumer",
      "Credentials": {
        "BasicAuthArn": "arn:aws:secretsmanager:ap-northeast-1:123456789012:secret:kafka-credentials"
      },
      "BatchSize": 100
    }
  }' \
  --target "arn:aws:lambda:ap-northeast-1:123456789012:function:process-payment" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

フィルタリング詳解

Filter 設定方法

aws pipes create-pipe \
  --name "filtered-pipe" \
  --source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
  --filter-pattern '{
    "body": {
      "type": ["payment"],
      "amount": [{"numeric": [">", 100]}],
      "status": ["pending"]
    }
  }' \
  --target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

Filter パターンの詳細例

// 複合ロジック:type=payment OR type=refund
{
  "body": {
    "type": ["payment", "refund"]
  }
}

// 数値範囲:amount が 100 以上 10000 未満
{
  "body": {
    "amount": [
      {"numeric": [">=", 100]},
      {"numeric": ["<", 10000]}
    ]
  }
}

// 文字列プリフィックス:order_id が "ORD-" で始まる
{
  "body": {
    "order_id": [{"prefix": "ORD-"}]
  }
}

// 存在チェック:customer_id フィールドが存在する
{
  "body": {
    "customer_id": [{"exists": true}]
  }
}

エンリッチメント詳解

Enrichment パターン 1:Lambda で変換

# Lambda 関数(enrich-payment)
import boto3
import json

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('customers')

def handler(event, context):
    # イベントから顧客 ID を取得
    order_id = event['body']['order_id']
    customer_id = event['body']['customer_id']
    
    # DynamoDB から顧客情報を取得
    response = table.get_item(Key={'customer_id': customer_id})
    customer = response.get('Item', {})
    
    # イベントに情報を追加
    enriched = {
        **event,
        'customer_name': customer.get('name'),
        'customer_email': customer.get('email'),
        'loyalty_points': customer.get('loyalty_points', 0),
        'enriched_at': datetime.now().isoformat()
    }
    
    return enriched

Enrichment パターン 2:HTTP エンドポイント(API Gateway)

aws pipes create-pipe \
  --name "http-enrichment-pipe" \
  --source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
  --enrichment "arn:aws:apigateway:ap-northeast-1::/restapis/abc123/resources/..." \
  --enrichment-parameters '{
    "HttpParameters": {
      "HeaderParameters": {
        "Authorization": "Bearer token"
      },
      "QueryStringParameters": {
        "customer_id": "$.body.customer_id"
      }
    },
    "RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
  }' \
  --target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

Enrichment パターン 3:Step Functions Express Workflow

{
  "Comment": "支払い情報をエンリッチするワークフロー",
  "StartAt": "FetchCustomer",
  "States": {
    "FetchCustomer": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:getItem",
      "Parameters": {
        "TableName": "customers",
        "Key": {
          "customer_id": {
            "S.{{CONTENT}}quot;: "$.body.customer_id"
          }
        }
      },
      "Next": "CheckInventory"
    },
    "CheckInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:check-inventory",
      "Next": "Success"
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}

ターゲット詳解

Lambda ターゲット

aws pipes create-pipe \
  --name "sqs-to-lambda-pipe" \
  --source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
  --target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
  --target-parameters '{
    "LambdaFunctionParameters": {
      "InvocationType": "RequestResponse"
    },
    "RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
  }'

Step Functions ターゲット

aws pipes create-pipe \
  --name "event-to-sfn-pipe" \
  --source "arn:aws:events:ap-northeast-1:123456789012:event-bus/default" \
  --target "arn:aws:states:ap-northeast-1:123456789012:stateMachine:OrderProcessing" \
  --target-parameters '{
    "StepFunctionStateMachineParameters": {
      "InvocationType": "FIRE_AND_FORGET"
    },
    "RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
  }'

SQS ターゲット

aws pipes create-pipe \
  --name "dlq-to-queue-pipe" \
  --source "arn:aws:sqs:ap-northeast-1:123456789012:dlq" \
  --target "arn:aws:sqs:ap-northeast-1:123456789012:retry-queue" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

EventBridge Bus ターゲット

aws pipes create-pipe \
  --name "stream-to-eventbus-pipe" \
  --source "arn:aws:kinesis:ap-northeast-1:123456789012:stream/events" \
  --target "arn:aws:events:ap-northeast-1:123456789012:event-bus/custom-bus" \
  --target-parameters '{
    "EventBridgeParameters": {
      "DetailType": "Payment Event",
      "Source": "custom.pipes"
    },
    "RoleArn": "arn:aws:iam::123456789012:role/PipesRole"
  }'

入力トランスフォーマー

Pipes には別のトランスフォーマー機能があり、イベント構造を再構築可能です。

aws pipes create-pipe \
  --name "transform-pipe" \
  --source "arn:aws:sqs:ap-northeast-1:123456789012:orders" \
  --target-parameters '{
    "HttpParameters": {
      "PathParameterValues": [],
      "HeaderParameters": {
        "X-Custom-Header": "value"
      },
      "QueryStringParameters": {
        "order_id": "$.body.order_id",
        "customer_id": "$.body.customer_id"
      }
    }
  }'

エラーハンドリング・DLQ

DLQ 設定

# SQS ソースの DLQ
aws pipes create-pipe \
  --name "pipe-with-dlq" \
  --source "arn:aws:sqs:ap-northeast-1:123456789012:source" \
  --source-parameters '{
    "SqsQueueParameters": {
      "DeadLetterConfig": {
        "Arn": "arn:aws:sqs:ap-northeast-1:123456789012:pipe-dlq"
      }
    }
  }' \
  --target "arn:aws:lambda:ap-northeast-1:123456789012:function:process" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

Retry ポリシー

{
  "DeadLetterConfig": {
    "Arn": "arn:aws:sqs:ap-northeast-1:123456789012:dlq"
  },
  "RetryPolicy": {
    "MaximumEventAge": 3600,
    "MaximumRetryAttempts": 2
  }
}

主要ユースケース

ユースケース 1:リアルタイム注文処理

  • DynamoDB Streams(注文作成イベント)
  • → Filter: status=pending
  • → Enrichment: 顧客・在庫情報取得(Lambda)
  • → Target: Step Functions ワークフロー(支払い処理開始)

ユースケース 2:ログアグリゲーション&フィルタリング

  • CloudWatch Logs(Application ログ)
  • → Filter: ERROR / CRITICAL のみ
  • → Enrichment: コンテキスト追加(API Gateway)
  • → Target: SNS(アラート)+ DataDog

ユースケース 3:Kafka をソースに、複数ターゲットへ

Kafka(MSK)トピック
  → Filter: event_type=payment
  → Enrichment: なし
  → Target-1: Lambda(リアルタイム処理)
  → Target-2: Kinesis(分析パイプライン)
  
# Pipes は 1 パイプ = 1 ターゲットなので複数パイプ必要

ユースケース 4:DynamoDB Streams → SaaS 連携

  • DynamoDB Streams
  • → Filter: table=contacts AND operation=INSERT
  • → Enrichment: Salesforce API で顧客レコード作成(HTTP)
  • → Target: EventBridge(内部イベント通知)

設定・操作の具体例

パイプ作成(完全例)

#!/bin/bash

PIPE_NAME="payment-processing-pipe"
SOURCE_ARN="arn:aws:sqs:ap-northeast-1:123456789012:payment-queue"
ENRICHMENT_ARN="arn:aws:lambda:ap-northeast-1:123456789012:function:enrich-payment"
TARGET_ARN="arn:aws:states:ap-northeast-1:123456789012:stateMachine:ProcessPayment"
ROLE_ARN="arn:aws:iam::123456789012:role/PipesExecutionRole"

aws pipes create-pipe \
  --name "$PIPE_NAME" \
  --source "$SOURCE_ARN" \
  --source-parameters '{
    "SqsQueueParameters": {
      "BatchSize": 10,
      "MaximumBatchingWindowInSeconds": 5,
      "DeadLetterConfig": {
        "Arn": "arn:aws:sqs:ap-northeast-1:123456789012:payment-dlq"
      }
    }
  }' \
  --filter-pattern '{
    "body": {
      "type": ["payment"],
      "amount": [{"numeric": [">", 100]}]
    }
  }' \
  --enrichment "$ENRICHMENT_ARN" \
  --enrichment-parameters '{
    "LambdaFunctionParameters": {
      "InvocationType": "RequestResponse"
    },
    "RoleArn": "'$ROLE_ARN'"
  }' \
  --target "$TARGET_ARN" \
  --target-parameters '{
    "StepFunctionStateMachineParameters": {
      "InvocationType": "FIRE_AND_FORGET"
    },
    "RoleArn": "'$ROLE_ARN'"
  }'

echo "Pipe created: $PIPE_NAME"

パイプ確認

# パイプ一覧表示
aws pipes list-pipes

# パイプ詳細確認
aws pipes describe-pipe --name payment-processing-pipe

# パイプの実行状態確認
aws pipes describe-pipe-source-parameters --name payment-processing-pipe

パイプ更新

aws pipes update-pipe \
  --name payment-processing-pipe \
  --source-parameters '{
    "SqsQueueParameters": {
      "BatchSize": 20
    }
  }'

パイプ削除

aws pipes delete-pipe --name payment-processing-pipe

CLI / SDK / IaC

AWS CDK での定義

from aws_cdk import (
    core,
    pipes,
    sqs,
    lambda_,
    stepfunctions
)

class PaymentPipesStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs):
        super().__init__(scope, id, **kwargs)
        
        # ソース(SQS)
        source_queue = sqs.Queue(self, "PaymentQueue")
        dlq = sqs.Queue(self, "PaymentDLQ")
        
        # エンリッチメント(Lambda)
        enrich_function = lambda_.Function(
            self, "EnrichPayment",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=lambda_.Code.from_asset("lambda")
        )
        
        # ターゲット(Step Functions)
        sfn_role = iam.Role(...)
        state_machine = stepfunctions.StateMachine(...)
        
        # Pipe 作成
        pipe = pipes.Pipe(
            self, "PaymentPipe",
            source=pipes.PipeSource.from_sqs_queue(source_queue),
            source_parameters=pipes.SqsSourceParameters(
                batch_size=10,
                dead_letter_config=pipes.DeadLetterConfig(
                    arn=dlq.queue_arn
                )
            ),
            filter_pattern=pipes.FilterPattern.from_json({
                "body": {
                    "type": ["payment"],
                    "amount": [{"numeric": [">", 100]}]
                }
            }),
            enrichment=pipes.Enrichment.from_lambda_function(enrich_function),
            target=pipes.PipeTarget.from_step_functions_state_machine(state_machine),
            role=iam.Role(...)
        )

Terraform での定義

resource "aws_pipes_pipe" "payment_pipe" {
  name             = "payment-processing-pipe"
  role_arn         = aws_iam_role.pipes_role.arn
  source           = "arn:aws:sqs:ap-northeast-1:123456789012:payment-queue"
  source_parameters = jsonencode({
    SqsQueueParameters = {
      BatchSize                   = 10
      MaximumBatchingWindowInSeconds = 5
      DeadLetterConfig = {
        Arn = aws_sqs_queue.dlq.arn
      }
    }
  })

  filter = jsonencode({
    body = {
      type   = ["payment"]
      amount = [{ numeric = [">", 100] }]
    }
  })

  enrichment = aws_lambda_function.enrich.arn
  enrichment_parameters = jsonencode({
    LambdaFunctionParameters = {
      InvocationType = "RequestResponse"
    }
    RoleArn = aws_iam_role.pipes_role.arn
  })

  target = aws_sfn_state_machine.process.arn
  target_parameters = jsonencode({
    StepFunctionStateMachineParameters = {
      InvocationType = "FIRE_AND_FORGET"
    }
    RoleArn = aws_iam_role.pipes_role.arn
  })
}

CloudFormation での定義

Resources:
  PaymentProcessingPipe:
    Type: AWS::Pipes::Pipe
    Properties:
      Name: payment-processing-pipe
      RoleArn: !GetAtt PipesRole.Arn
      Source: !GetAtt PaymentQueue.Arn
      SourceParameters:
        SqsQueueParameters:
          BatchSize: 10
          MaximumBatchingWindowInSeconds: 5
          DeadLetterConfig:
            Arn: !GetAtt PaymentDLQ.Arn
      
      Filter: |
        {
          "body": {
            "type": ["payment"],
            "amount": [{"numeric": [">", 100]}]
          }
        }
      
      Enrichment: !GetAtt EnrichPaymentFunction.Arn
      EnrichmentParameters:
        LambdaFunctionParameters:
          InvocationType: RequestResponse
        RoleArn: !GetAtt PipesRole.Arn
      
      Target: !GetAtt ProcessPaymentStateMachine.Arn
      TargetParameters:
        StepFunctionStateMachineParameters:
          InvocationType: FIRE_AND_FORGET
        RoleArn: !GetAtt PipesRole.Arn

Lambda ESM との比較

観点 Lambda ESM EventBridge Pipes
主な用途 Lambda 関数へのイベント配信 ポイントツーポイント統合
ターゲット Lambda のみ Lambda / Step Functions / SQS / SNS / Kinesis / HTTP 等
フィルタリング Lambda コード内 ビジュアル / JSON パターン
エンリッチメント Lambda で実装 ビルトイン(Lambda / API / Step Functions)
変換 Lambda で実装 Input Transformer で可能
複数ターゲット ❌不可 ❌1 パイプ = 1 ターゲット(複数パイプ必要)
コスト Lambda 実行料金 $0.40/100 万イベント + ターゲット料金
コード記述 ✅必須(フィルタ・変換ロジック) ❌不要(ノーコード)
スケーリング 自動(Lambda) 自動(Pipes)
DLQ ✅対応 ✅対応
採用シーン SQS/Kinesis → Lambda SQS/Kinesis/DynamoDB Streams → 複数ターゲット

類似ツール比較

ツール 対象 用途 特徴
Lambda ESM SQS/Kinesis → Lambda イベント駆動のサーバーレス処理 AWS ネイティブ、信頼性高
EventBridge Pipes 複数ソース → 複数ターゲット ポイントツーポイント統合 ノーコード、柔軟ターゲット
Apache Camel OSS 統合フレームワーク EIP(Enterprise Integration Patterns)実装 複雑な変換・ルーティング
Kafka Connect Kafka ↔ 外部システム Kafka → RDB / S3 等 Kafka エコシステム専用
Step Functions AWS サービス統合 複雑なワークフロー 状態管理・エラーハンドリング
AWS Glue バッチ ETL データ変換・クレンジング スケジュールベース処理

ベストプラクティス

✅ やるべきこと

  1. Filter を使用して不要イベントを早期除外

    // キューからの全イベントを Lambda に送信するのではなく
    // フィルターで対象イベントのみ通過させ、課金を削減
    {
      "body": {
        "type": ["payment"],
        "status": ["pending"]
      }
    }
    
  2. エンリッチメント は同期的に実行(Lambda)

    # エンリッチメント Lambda は RequestResponse(同期)で
    # ターゲットに確実にデータを届ける
    
  3. DLQ を必ず設定

    # ソース・ターゲットの失敗イベントを DLQ に記録
    # 後続の再処理・デバッグに活用
    
  4. CloudWatch Metrics で監視

    # InvocationCount(実行数)
    # DeadLetterDeliveries(失敗数)
    # Duration(実行時間)
    
  5. 複数ターゲットが必要なら複数パイプを構築

    # 1 パイプ = 1 ターゲットなので
    # fan-out が必要なら Pipe → EventBridge Bus → 複数ルール
    

❌ やってはいけないこと

  1. エンリッチメント なしで複雑なロジック実装

    # ターゲット Lambda で全変換ロジック → Pipes の利点喪失
    # Enrichment で前処理、Target で処理に役割分担
    
  2. Filter を使わずに全イベント処理

    # キューの全イベント → Target で課金発生
    # 不要なイベントはフィルターで除外して コスト削減
    
  3. エンリッチメント Lambda を FireAndForget(非同期)実行

    # 同期実行(RequestResponse)で確実に データ受け取り
    # 非同期だとエンリッチメント結果を得られない
    
  4. DLQ なしでのデプロイ

    # エラー追跡ができず、デバッグが困難に
    # 本番投入前に DLQ 必須
    
  5. 複数エンリッチメント を串刺し

    # Enrichment → Target → Lambda(別エンリッチメント)
    # → この場合は Step Functions Enrichment で複数ステップ実装
    

トラブルシューティング

Issue 1:Pipe がイベントを処理していない

# 原因確認
1. Pipe が RUNNING 状態か
aws pipes describe-pipe --name my-pipe --query 'State'

2. ソースにイベントが存在するか
aws sqs get-queue-attributes \
  --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/source \
  --attribute-names ApproximateNumberOfMessages

3. Filter パターンが正しいか
# Filter が厳しすぎてイベントが全て除外されていないか

4. IAM ロール権限が不足していないか
# arn:aws:iam::123456789012:role/PipesRole に必要な権限があるか

Issue 2:エンリッチメント が失敗している

# 原因確認
1. Enrichment Lambda が存在・実行可能か
aws lambda get-function --function-name enrich-payment

2. Lambda IAM 実行ロールが正しいか
aws iam get-role --role-name LambdaExecutionRole

3. Lambda 関数がエラーを返していないか
aws logs tail /aws/lambda/enrich-payment --follow

4. Enrichment の InvocationType が RequestResponse か
# FireAndForget だと結果が取得できない

Issue 3:ターゲット送信が失敗している

# 原因確認
1. ターゲットリソースが存在・アクセス可能か
aws lambda get-function --function-name target-function

2. Pipes ロールがターゲットに invoke 権限を持っているか
# ターゲット (Lambda / Step Functions) への IAM PermissionPolicy 確認

3. CloudWatch ダッシュボードで失敗数を確認
aws cloudwatch get-metric-statistics \
  --namespace AWS/Pipes \
  --metric-name DeadLetterDeliveries \
  --dimensions Name=PipeName,Value=my-pipe \
  --start-time 2024-01-01T00:00:00Z \
  --end-time 2024-01-02T00:00:00Z \
  --period 3600 \
  --statistics Sum

Issue 4:DLQ にイベントが溜まっている

# DLQ メッセージの確認
aws sqs receive-message \
  --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/pipe-dlq \
  --max-number-of-messages 10

# メッセージ本体を確認
# DLQ メッセージには元々のイベント + エラー情報が含まれている

2025-2026 最新動向

1. Transform Stage の追加(2025)

フィルター→エンリッチメント→ターゲット に加え、Transform ステージで入力/出力の構造を再構築可能に。

aws pipes create-pipe \
  --name "transform-pipe" \
  --source ... \
  --enrichment ... \
  --transform-parameters '{
    "InputPath": "$.body",
    "OutputPath": "$.enriched"
  }' \
  --target ...

2. マルチターゲット Pipe(計画中)

1 Pipe で複数ターゲットをサポート(現在は 1 Pipe = 1 ターゲット制限を緩和予定)

3. ビジュアル Pipe Editor

AWS Console でドラッグ&ドロップでパイプ構築可能に(CloudFormation 自動生成)

4. Serverless Apache Kafka 統合

サーバーレス Kafka(AWS Lambda と統合) を Pipes ソースに対応

5. AI-Driven エンリッチメント推奨

CloudWatch メトリクスから自動的にエンリッチメント必要性を判定・提案


学習リソース・参考文献

AWS 公式資料

AWS ブログ・動画

サードパーティ・OSS


実装例・活用シーン

シーン 1:E-Commerce 支払い処理

  • SQS Queue(支払いリクエスト)
  • → Filter: type=payment & amount>100
  • → Enrichment: Lambda で顧客・住所情報取得
  • → Target: Step Functions(支払い処理ワークフロー)

シーン 2:DynamoDB Streams → 複数システム連携

DynamoDB Streams(顧客マスター更新)
  → Filter: operation=INSERT or MODIFY
  → Enrichment: なし
  → Target: Pipe-1: Lambda(Elasticsearch 更新)
  → Target: Pipe-2: HTTP(SaaS CRM 同期)
  → Target: Pipe-3: SNS(社内通知)

シーン 3:Kafka トピック → リアルタイムダッシュボード

  • MSK Topic(イベントストリーム)
  • → Filter: event_type=user_action
  • → Enrichment: API Gateway(追加メタデータ取得)
  • → Target: Kinesis(リアルタイム分析)

スケーリング・パフォーマンス

スループット特性

メトリクス 備考
最大スループット 100,000+ イベント/秒 自動スケーリング
バッチサイズ 1~10,000 ソース、ターゲットで設定可能
レイテンシ < 1 秒 典型的な構成での P99
エンリッチメント時間 + 100~500ms Lambda / API Gateway の実行時間に依存

パフォーマンス最適化のコツ

  1. バッチサイズを適切に設定

    # SQS:10~100 が推奨
    # Kinesis:100~1000 が推奨
    # 大きいほどレイテンシ増加、スループット向上
    
  2. エンリッチメント を最小化

    # 複数エンリッチメントが必要なら
    # → Step Functions Express で複数ステップ実装
    
  3. フィルター で不要イベント早期除外

    # ターゲット到達前に除外 → レイテンシ削減
    

セキュリティ・IAM

IAM ロール必須権限

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:source"
    },
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction"
      ],
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "states:StartExecution"
      ],
      "Resource": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage"
      ],
      "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:target"
    }
  ]
}

監視・ロギング

CloudWatch メトリクス

# Pipe の実行数
aws cloudwatch get-metric-statistics \
  --namespace AWS/Pipes \
  --metric-name InvocationCount \
  --dimensions Name=PipeName,Value=my-pipe \
  --period 300 \
  --statistics Sum

# DLQ 配信数(失敗)
aws cloudwatch get-metric-statistics \
  --namespace AWS/Pipes \
  --metric-name DeadLetterDeliveries \
  --dimensions Name=PipeName,Value=my-pipe \
  --period 300 \
  --statistics Sum

# パイプ実行時間
aws cloudwatch get-metric-statistics \
  --namespace AWS/Pipes \
  --metric-name Duration \
  --dimensions Name=PipeName,Value=my-pipe \
  --period 300 \
  --statistics Average

CloudWatch Logs

# Pipe ログを CloudWatch Logs に記録
# LogLevel: ERROR / INFO / DEBUG 設定可能

チェックリスト

  • [ ] ソース(SQS / Kinesis / DynamoDB Streams 等)が正しく指定されているか
  • [ ] バッチサイズが適切か(SQS: 10~100、Kinesis: 100~1000)
  • [ ] Filter パターンが正確か(テスト済み)
  • [ ] Enrichment が必要か、必要なら実装済みか
  • [ ] ターゲットが存在・アクセス可能か
  • [ ] IAM ロール権限が揃っているか
  • [ ] DLQ が設定されているか
  • [ ] CloudWatch ダッシュボード・アラームが設定されているか
  • [ ] 非本番環境でテスト済みか
  • [ ] パイプの実行状態が RUNNING か
  • [ ] エラー率が許容範囲か(CloudWatch メトリクス確認)

まとめ

EventBridge Pipes は 「イベントソースとターゲット間のポイントツーポイント統合を、フィルタリング・エンリッチメント・変換をビルトインで実現するノーコードサービス」。SQS / Kinesis / DynamoDB Streams / MSK などの複数ソースから Lambda / Step Functions / SQS / SNS / EventBridge / HTTP エンドポイント等への統合を、コード記述不要で構成可能です。

採用メリット:

  • コードなしのフィルタリング・エンリッチメント
  • 複数ターゲット対応(複数パイプで fan-out)
  • Lambda ESM より低コスト(イベント課金)かつ柔軟
  • CloudFormation / CDK で IaC 対応可能
  • DLQ・自動リトライ で信頼性向上

設計のコツ:

  1. フィルター で不要イベント早期除外(コスト削減)
  2. エンリッチメント は前処理専用(ビジネスロジックはターゲットで)
  3. DLQ を必ず設定(エラー追跡)
  4. CloudWatch メトリクス で継続監視
  5. 複数ターゲット必要なら → Pipe → EventBridge Bus → 複数ルール

EventBridge Pipes は、イベント駆動アーキテクチャにおける統合レイヤーのベストプラクティス実装。コスト削減・開発速度向上・保守性改善を同時に実現します。


最終更新:2026-04-26
バージョン:v2.0