目次

Amazon MSK Connect v2.0 完全ガイド(Streaming & Integration)

概要

Amazon MSK Connect は、Apache Kafka Connect をフルマネージドで実行するサービスです。Kafka(MSK)と外部システム(S3・RDS・DynamoDB・OpenSearch・Salesforce・Datadog 等)を Kafka Connector で自動連携し、ワーカーのインフラ管理(スケーリング・パッチ・監視)を完全に AWS が担当します。Debezium・Confluent・Aiven のコネクターをそのまま利用できるため、既存 Kafka Connect エコシステムの資産を活用したデータパイプライン構築が可能です。


課題解決

  • Kafka Connect インフラの運用負担排除 - EC2 ベースの自前 Kafka Connect クラスターは、ワーカー数管理・メモリチューニング・パッチ適用が煩雑。MSK Connect はこれらを自動化
  • 複数データソースへの手動パイプライン構築が困難 - DB の変更(CDC)を S3・DWH・キャッシュに同期させるパイプラインを Lambda で自前実装するより、Kafka Connect の標準コネクターが効率的
  • Kafka と外部システムの同期遅延 - リアルタイムストリーミング同期で、顧客データ・在庫・ログを複数システム間で常に最新状態に保つ必要
  • コネクター設定管理の複雑さ - 複数ワーカーでの設定同期、プラグインバージョン管理、リソース利用率の監視が煩雑

アーキテクチャ概要

ソースコネクター(外部 → Kafka):

DB / SaaS データソース:
    ├─ RDS (MySQL / PostgreSQL)
    │   └─ Debezium MySQL CDC Connector
    │       └─ FULL / INCREMENTAL / WAL(Write Ahead Log)
    │
    ├─ DynamoDB
    │   └─ DynamoDB Streams → Kafka
    │
    ├─ Salesforce
    │   └─ Salesforce Source Connector
    │
    └─ Datadog / Splunk
        └─ API ポーリング Connector

    ↓ MSK Connect ワーカー(マネージド)
    
Apache Kafka(MSK トピック):
    ├─ Topic: users.changes(ユーザー変更イベント)
    ├─ Topic: orders.events(注文イベント)
    └─ Topic: inventory.sync(在庫同期)

シンクコネクター(Kafka → 外部):

Apache Kafka(MSK)
    ↓ MSK Connect ワーカー

配信先:
    ├─ S3(Parquet / JSON)
    │   └─ Time-partitioned(year=YYYY/month=MM/day=dd)
    │
    ├─ OpenSearch Service
    │   └─ リアルタイムインデックス
    │
    ├─ Redshift / Snowflake
    │   └─ ウェアハウス同期
    │
    ├─ Elasticsearch / Splunk
    │   └─ ログ・分析
    │
    └─ DynamoDB / RDS
        └─ 別システム DB への逆同期

スケーリング戦略:
    ├─ Provisioned: ワーカー数固定(2~10 個)
    ├─ Autoscaled: CPU / MBps に応じて MCU で自動スケール
    └─ Topic 別パーティション: コネクター数 = パーティション数が効率的

コアコンポーネント

1. Custom Plugin(カスタムプラグイン)

Debezium・Confluent・Aiven などのオープンソースコネクターを S3 にアップロードし、カスタムプラグインとして登録。

# ステップ1: Debezium MySQL Connector JAR を S3 に配置
# https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.4.0.Final/
# debezium-connector-mysql-2.4.0.Final-plugin.tar.gz をダウンロード

aws s3 cp debezium-connector-mysql-2.4.0.Final-plugin.tar.gz \
  s3://my-msk-connect-plugins/debezium/mysql/

# ステップ2: カスタムプラグイン登録
aws kafkaconnect create-custom-plugin \
  --name debezium-mysql-source \
  --content-type ZIP \
  --location '{
    "s3Location": {
      "bucketArn": "arn:aws:s3:::my-msk-connect-plugins",
      "fileKey": "debezium/mysql/debezium-connector-mysql-2.4.0.Final-plugin.tar.gz",
      "objectVersion": "v1"
    }
  }' \
  --description "Debezium MySQL CDC Connector v2.4.0"

# ステップ3: プラグイン ARN を取得
aws kafkaconnect describe-custom-plugin \
  --custom-plugin-arn arn:aws:kafkaconnect:ap-northeast-1:123456789012:custom-plugin/debezium-mysql-source/...

2. Worker Configuration(ワーカー設定)

MSK Connect ワーカーのリソース・ログ・監視設定を定義。複数コネクターで共有可能。

# ワーカー設定作成
aws kafkaconnect create-worker-configuration \
  --name msk-connect-worker-config \
  --properties-file-content '
# ワーカー設定
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# JVM メモリ設定(重要)
CONNECT_HEAP_OPTS=-Xmx2G -Xms2G

# タスク設定
tasks.max=10
offset.storage.topic=msk-connect-offsets
offset.storage.replication.factor=3
config.storage.topic=msk-connect-config
config.storage.replication.factor=3
status.storage.topic=msk-connect-status
status.storage.replication.factor=3

# 実行プロパティ
connector.client.config.override.policy=All

# ログ設定(CloudWatch に転送)
log4j.rootLogger=INFO, stdout, appender
' \
  --description "本番 MSK Connect ワーカー設定"

3. Source Connector(ソースコネクター)

3.1 Debezium MySQL CDC

# MySQL 変更データキャプチャ(Change Data Capture)
aws kafkaconnect create-connector \
  --connector-name mysql-cdc-source \
  --connector-configuration '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "name": "mysql-cdc-source",
    "tasks.max": "3",
    
    # DB 接続
    "database.hostname": "mydb.c9akciq32.us-east-1.rds.amazonaws.com",
    "database.port": "3306",
    "database.user": "${secretManager:mysql-creds:username}",
    "database.password": "${secretManager:mysql-creds:password}",
    "database.server.id": "1",
    "database.server.name": "mydb-prod",
    
    # テーブル・DB フィルタ
    "database.include.list": "orders,customers,inventory",
    "table.include.list": "orders.orders,orders.order_items,customers.users,inventory.stock",
    "column.include.list": "orders.orders.order_id,orders.orders.customer_id,orders.orders.amount",
    
    # スキーマ変更トラッキング
    "include.schema.changes": "true",
    "database.history.kafka.bootstrap.servers": "b-1.msk-prod.xxx.ap-northeast-1.kafka.amazonaws.com:9098",
    "database.history.kafka.topic": "dbhistory.mydb",
    
    # CDC モード(binlog)
    "database.history.kafka.topic": "dbhistory.mydb",
    "decimal.handling.mode": "double",
    "bigint.unsigned.handling.mode": "long",
    
    # snapshot 設定
    "snapshot.mode": "when_needed",
    "snapshot.locking.mode": "minimal",
    "snapshot.fetch.size": "10000",
    
    # パフォーマンス
    "batch.size": "2048",
    "poll.interval.ms": "1000"
  }' \
  --kafka-cluster '{
    "apacheKafkaCluster": {
      "bootstrapServers": "b-1.msk-prod.xxx.ap-northeast-1.kafka.amazonaws.com:9098,b-2.msk-prod.xxx.ap-northeast-1.kafka.amazonaws.com:9098",
      "vpc": {
        "subnets": ["subnet-a", "subnet-b", "subnet-c"],
        "securityGroups": ["sg-msk-connect"]
      }
    }
  }' \
  --kafka-cluster-client-authentication '{
    "authenticationType": "IAM"
  }' \
  --kafka-cluster-encryption-in-transit '{
    "encryptionType": "TLS"
  }' \
  --plugins '[{
    "customPlugin": {
      "customPluginArn": "arn:aws:kafkaconnect:ap-northeast-1:123456789012:custom-plugin/debezium-mysql-source/..."
    }
  }]' \
  --capacity '{
    "autoScaling": {
      "minWorkerCount": 2,
      "maxWorkerCount": 10,
      "mcuCount": 1,
      "scaleInPolicy": {
        "cpuUtilizationPercentage": 20
      },
      "scaleOutPolicy": {
        "cpuUtilizationPercentage": 80
      }
    }
  }' \
  --log-delivery '{
    "workerLogDelivery": {
      "cloudWatchLogs": {
        "enabled": true,
        "logGroup": "/aws/msk-connect/mysql-cdc"
      },
      "s3": {
        "enabled": true,
        "bucket": "msk-connect-logs",
        "prefix": "mysql-cdc/"
      }
    }
  }' \
  --worker-configuration '{
    "arn": "arn:aws:kafkaconnect:ap-northeast-1:123456789012:worker-configuration/msk-connect-worker-config/..."
  }' \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole

3.2 Salesforce Source Connector

# Salesforce データの Kafka へのストリーミング
aws kafkaconnect create-connector \
  --connector-name salesforce-source \
  --connector-configuration '{
    "connector.class": "com.salesforce.kafka.connect.SalesforceSourceConnector",
    "name": "salesforce-source",
    "tasks.max": "1",
    
    # Salesforce 認証
    "salesforce.instance": "my-org.salesforce.com",
    "salesforce.client.id": "${secretManager:salesforce-oauth:client_id}",
    "salesforce.client.secret": "${secretManager:salesforce-oauth:client_secret}",
    "salesforce.username": "${secretManager:salesforce-oauth:username}",
    "salesforce.password": "${secretManager:salesforce-oauth:password}",
    
    # Sobject 抽出
    "salesforce.objects": "Account,Contact,Opportunity,Lead",
    "salesforce.object.account.extraction.query": "SELECT Id,Name,BillingCity,BillingCountry FROM Account WHERE LastModifiedDate > :start_time",
    
    # API 選択
    "salesforce.api": "REST",
    "salesforce.polling.interval": "3600000"
  }' \
  --kafka-cluster '...' \
  --plugins '[...]' \
  --capacity '{
    "provisionedCapacity": {
      "mcuCount": 1,
      "workerCount": 1
    }
  }' \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole

4. Sink Connector(シンクコネクター)

4.1 S3 Sink Connector(Parquet 形式)

# Kafka トピック → S3 時系列アーカイブ
aws kafkaconnect create-connector \
  --connector-name kafka-to-s3-sink \
  --connector-configuration '{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "name": "kafka-to-s3-sink",
    "tasks.max": "4",
    "topics": "orders,customers,inventory,events",
    
    # S3 設定
    "s3.region": "ap-northeast-1",
    "s3.bucket.name": "kafka-data-lake",
    "s3.part.size": "134217728",
    "flush.size": "5000",
    "rotate.interval.ms": "3600000",
    
    # ストレージ・フォーマット設定
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",
    "parquet.compression.codec": "snappy",
    
    # 時系列パーティショニング
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "year=YYYY/month=MM/day=dd/hour=HH",
    "locale": "ja_JP",
    "timezone": "Asia/Tokyo",
    "partition.duration.ms": "3600000",
    
    # レコード変換
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    
    # エラー処理
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "kafka-to-s3-dlq",
    "errors.log.enable": "true"
  }' \
  --kafka-cluster '...' \
  --plugins '[{
    "customPlugin": {
      "customPluginArn": "arn:aws:kafkaconnect:ap-northeast-1:123456789012:custom-plugin/s3-sink/..."
    }
  }]' \
  --capacity '{
    "autoScaling": {
      "minWorkerCount": 2,
      "maxWorkerCount": 10,
      "mcuCount": 1,
      "scaleOutPolicy": {
        "cpuUtilizationPercentage": 70,
        "throughputUtilizationPercentage": 70
      }
    }
  }' \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole

4.2 OpenSearch Sink Connector

# Kafka イベント → OpenSearch リアルタイムインデックス
aws kafkaconnect create-connector \
  --connector-name kafka-to-opensearch-sink \
  --connector-configuration '{
    "connector.class": "org.opensearch.connectors.kafka.OpenSearchSinkConnector",
    "name": "kafka-to-opensearch-sink",
    "tasks.max": "2",
    "topics": "events,logs,transactions",
    
    # OpenSearch 接続
    "connection.url": "https://opensearch-domain.ap-northeast-1.es.amazonaws.com:9200",
    "connection.username": "${secretManager:opensearch-creds:username}",
    "connection.password": "${secretManager:opensearch-creds:password}",
    
    # インデックス設定
    "index_name": "kafka-events",
    "type_name": "_doc",
    "write_method": "upsert",
    "batch.size": "1000",
    "linger.ms": "5000",
    
    # インデックス ローテーション
    "behavior.on.null.values": "ignore",
    "date_format": "yyyy.MM.dd",
    "index_prefix": "kafka-",
    "index_partition_size": "1000000"
  }' \
  --kafka-cluster '...' \
  --plugins '[...]' \
  --capacity '{
    "provisionedCapacity": {
      "mcuCount": 2,
      "workerCount": 2
    }
  }' \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole

ユースケース実装

UC1: リアルタイム CDC パイプライン(MySQL → S3 → Redshift)

import boto3
import json
from datetime import datetime

# MSK Connect 管理
kconnect = boto3.client('kafkaconnect', region_name='ap-northeast-1')
cloudwatch = boto3.client('cloudwatch')

def setup_cdc_pipeline():
    """
    MySQL の変更イベントをリアルタイムに S3・Redshift に同期
    1. Debezium MySQL CDC: MySQL 変更 → Kafka
    2. S3 Sink: Kafka → S3 Parquet(バックアップ・監査ログ)
    3. Redshift: Kafka → Redshift(ウェアハウス同期)
    """
    
    # ステップ1: MySQL CDC コネクター作成(既上述)
    mysql_connector = kconnect.create_connector(
        connectorName='mysql-cdc',
        connectorConfiguration={
            'connector.class': 'io.debezium.connector.mysql.MySqlConnector',
            'database.hostname': 'mydb.rds.amazonaws.com',
            'database.port': '3306',
            'database.include.list': 'orders,customers',
            'database.server.id': '1',
            'database.server.name': 'mydb',
            # ...
        },
        # ...
    )
    print(f"MySQL CDC Connector Created: {mysql_connector['connectorArn']}")
    
    # ステップ2: S3 Sink コネクター作成
    s3_sink = kconnect.create_connector(
        connectorName='kafka-to-s3',
        connectorConfiguration={
            'connector.class': 'io.confluent.connect.s3.S3SinkConnector',
            'topics': 'mydb.orders,mydb.customers',
            's3.bucket.name': 'kafka-data-lake',
            's3.part.size': '134217728',
            'flush.size': '5000',
            # ...
        },
        # ...
    )
    print(f"S3 Sink Created: {s3_sink['connectorArn']}")
    
    # ステップ3: Redshift COPY ジョブ(S3 → Redshift)
    # Lambda で CloudWatch イベント(S3 PutObject)をトリガー
    # → Redshift COPY コマンドを実行
    
    print("CDC Pipeline Setup Complete")

def monitor_connector_health():
    """コネクター健全性監視"""
    
    # コネクター状態確認
    response = kconnect.describe_connector(
        connectorArn='arn:aws:kafkaconnect:ap-northeast-1:123456789012:connector/mysql-cdc/...'
    )
    
    status = response['connectorStatus']['currentStatus']
    print(f"Connector Status: {status}")
    
    if status != 'RUNNING':
        # CloudWatch に警告
        cloudwatch.put_metric_alarm(
            AlarmName='MSKConnectConnectorDown',
            ComparisonOperator='GreaterThanThreshold',
            EvaluationPeriods=1,
            MetricName='ConnectorFailureCount',
            Namespace='AWS/Kafka',
            Period=300,
            Statistic='Sum',
            Threshold=0,
            ActionsEnabled=True,
            AlarmActions=['arn:aws:sns:ap-northeast-1:123456789012:alerts']
        )

setup_cdc_pipeline()
monitor_connector_health()

UC2: ログ集約パイプライン(Datadog → Kafka → S3)

# Datadog から Kafka へログをストリーミング
aws kafkaconnect create-connector \
  --connector-name datadog-to-kafka \
  --connector-configuration '{
    "connector.class": "com.datadoghq.kafka.connect.logs.DatadogLogsSourceConnector",
    "name": "datadog-logs-source",
    "tasks.max": "1",
    "datadog.api.url": "https://api.datadoghq.jp/api/v1",
    "datadog.api.key": "${secretManager:datadog-api:key}",
    "datadog.app.key": "${secretManager:datadog-api:app_key}",
    "datadog.query": "status:error",
    "datadog.batch.size": "100",
    "kafka.topic": "datadog-logs"
  }' \
  --kafka-cluster '...' \
  --plugins '[...]' \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole

Auto Scaling 設定

MSK Connect のワーカーを CPU・MBps に応じて自動スケール。

def configure_autoscaling():
    """
    CPU 利用率 80% 以上 → スケールアウト(ワーカー追加)
    CPU 利用率 20% 以下 → スケールイン(ワーカー削減)
    """
    kconnect = boto3.client('kafkaconnect')
    
    # コネクター更新(Auto Scaling 有効化)
    response = kconnect.update_connector(
        connectorArn='arn:aws:kafkaconnect:ap-northeast-1:123456789012:connector/mysql-cdc/...',
        capacity={
            'autoScaling': {
                'minWorkerCount': 2,
                'maxWorkerCount': 10,
                'mcuCount': 1,
                'scaleOutPolicy': {
                    'cpuUtilizationPercentage': 80,
                    'throughputUtilizationPercentage': 70  # MBps 利用率
                },
                'scaleInPolicy': {
                    'cpuUtilizationPercentage': 20
                }
            }
        }
    )
    
    print(f"Auto Scaling Configured: {response['connectorArn']}")

configure_autoscaling()

競合他社との比較

観点 MSK Connect Confluent Cloud Striim Debezium Aiven Kafka Connect
ホスティング AWS マネージド SaaS オンプレ / SaaS OSS(自前構築) クラウド
ワーカー管理 全自動 全自動 手動 手動 全自動
コネクター数 100+ 150+ 50+ 150+ 100+
CDC(Debezium) ネイティブ対応 統合 ネイティブ 本家 ネイティブ対応
AWS 統合 ネイティブ API 連携 API 連携 API 連携 API 連携
IAM 認証 ネイティブ対応 API キー 別途認証 別途認証 サポート
Auto Scaling CPU / MBps ベース 自動 手動 N/A Auto Scaling
コスト $0.11/MCU-時間 $1/パーティション-時間 固定 無料(インフラ費用) $0.19/MCU-時間

コスト詳細

課金モデル

1. MCU(MSK Connect Unit):
   - $0.11/MCU-時間
   - 1 MCU = メモリ 512MB + CPU 0.25 コア(概算)
   
2. 例: 2 MCU × 24h × 30 日
   - $0.11 × 2 × 720 = $158.4/月

3. ストレージ(コネクター設定):
   - $0.10/GB/月(通常 < 1GB)

4. AWS インフラ(S3・Kafka 別途):
   - S3: $0.023/GB/月
   - MSK: ブローカーコスト + ストレージ

コスト最適化

def optimize_msk_connect_cost():
    """
    1. Provisioned Capacity: 固定 MCU(安定スループット向け)
    2. Auto Scaling: 変動スループット向け(バースト対応)
    3. ワーカー数最小化: パーティション数に最適化
    """
    
    # 現在コスト: 3 MCU × 24 × 30 × $0.11 = $237.6/月
    # 最適化後: Auto Scaling min=1 MCU, max=5 MCU(平均 2 MCU)
    # → 推定: 2 MCU × 24 × 30 × $0.11 = $158.4/月($79.2/月削減)
    
    estimated_monthly = 2 * 24 * 30 * 0.11
    print(f"Optimized Monthly Cost: ${estimated_monthly:.2f}")

optimize_msk_connect_cost()

ベストプラクティス

BP1: コネクター設計

  • タスク数 = パーティション数 - パーティション並列処理で最大スループット
  • Debezium snapshot モード - when_needed で初回スナップショットのみ実行
  • バッチサイズ調整 - small(256)vs large(2048)でレイテンシ ↔ スループット トレード

BP2: 監視・アラート

# CloudWatch メトリクス監視
cloudwatch.put_metric_alarm(
    AlarmName='MSKConnectLag',
    MetricName='BatchProcessTime',
    Namespace='AWS/Kafka',
    Statistic='Average',
    Period=300,
    Threshold=5000,  # ms
    ComparisonOperator='GreaterThanThreshold'
)

BP3: セキュリティ

  • IAM 認証: SASL_SSL + IAM(推奨)
  • Secrets Manager: DB パスワード・API キーを保護
  • VPC エンドポイント: Kafka Broker との通信を VPC 内に隔離

トラブルシューティング

症状 原因 対応
コネクター FAILED DB 接続失敗 / DB 권한不足 セキュリティグループ確認、DB ユーザー権限確認
レイテンシ増加 ワーカー CPU 枯渇 / パーティション遍在 MCU 増加、パーティション数最適化
メッセージロス Kafka ディスク満杯 / ワーカー クラッシュ EBS Auto-scaling 有効化、ワーカー再起動
Debezium snapshot 遅延 大型テーブルの初回同期 batch.size 最適化、並列スナップショット数調整
Provider Connector 非対応 プラグイン不存在 Confluent Hub で検索、カスタムプラグイン開発

採用判断チェックリスト

  • [ ] Kafka トピックから外部システム(S3 / DWH / OpenSearch)へのリアルタイム同期が必要?
  • [ ] Debezium CDC(MySQL / PostgreSQL)を使用予定?
  • [ ] Kafka Connect インフラ管理(ワーカー・パッチ)を自動化したい?
  • [ ] Confluent Kafka Connect プラグインをそのまま利用したい?
  • [ ] Auto Scaling で変動トラフィックに対応したい?

まとめ

Amazon MSK Connect は、Kafka Connect のフルマネージドサービスです。Debezium(MySQL / PostgreSQL CDC)・Confluent・Aiven のコネクターをそのまま利用でき、Kafka から S3・RDS・DynamoDB・OpenSearch・Salesforce への自動連携パイプラインを構築・運用できます。ワーカーの Auto Scaling・パッチ管理を完全に AWS が担当し、IAM 認証・TLS・CloudWatch との統合で企業グレードのセキュリティ・可視性を実現します。


最終更新:2026-04-27 バージョン:v2.0