目次
Amazon MSK Connect v2.0 完全ガイド(Streaming & Integration)
概要
Amazon MSK Connect は、Apache Kafka Connect をフルマネージドで実行するサービスです。Kafka(MSK)と外部システム(S3・RDS・DynamoDB・OpenSearch・Salesforce・Datadog 等)を Kafka Connector で自動連携し、ワーカーのインフラ管理(スケーリング・パッチ・監視)を完全に AWS が担当します。Debezium・Confluent・Aiven のコネクターをそのまま利用できるため、既存 Kafka Connect エコシステムの資産を活用したデータパイプライン構築が可能です。
課題解決
- Kafka Connect インフラの運用負担排除 - EC2 ベースの自前 Kafka Connect クラスターは、ワーカー数管理・メモリチューニング・パッチ適用が煩雑。MSK Connect はこれらを自動化
- 複数データソースへの手動パイプライン構築が困難 - DB の変更(CDC)を S3・DWH・キャッシュに同期させるパイプラインを Lambda で自前実装するより、Kafka Connect の標準コネクターが効率的
- Kafka と外部システムの同期遅延 - リアルタイムストリーミング同期で、顧客データ・在庫・ログを複数システム間で常に最新状態に保つ必要
- コネクター設定管理の複雑さ - 複数ワーカーでの設定同期、プラグインバージョン管理、リソース利用率の監視が煩雑
アーキテクチャ概要
ソースコネクター(外部 → Kafka):
DB / SaaS データソース:
├─ RDS (MySQL / PostgreSQL)
│ └─ Debezium MySQL CDC Connector
│ └─ FULL / INCREMENTAL / WAL(Write Ahead Log)
│
├─ DynamoDB
│ └─ DynamoDB Streams → Kafka
│
├─ Salesforce
│ └─ Salesforce Source Connector
│
└─ Datadog / Splunk
└─ API ポーリング Connector
↓ MSK Connect ワーカー(マネージド)
Apache Kafka(MSK トピック):
├─ Topic: users.changes(ユーザー変更イベント)
├─ Topic: orders.events(注文イベント)
└─ Topic: inventory.sync(在庫同期)
シンクコネクター(Kafka → 外部):
Apache Kafka(MSK)
↓ MSK Connect ワーカー
配信先:
├─ S3(Parquet / JSON)
│ └─ Time-partitioned(year=YYYY/month=MM/day=dd)
│
├─ OpenSearch Service
│ └─ リアルタイムインデックス
│
├─ Redshift / Snowflake
│ └─ ウェアハウス同期
│
├─ Elasticsearch / Splunk
│ └─ ログ・分析
│
└─ DynamoDB / RDS
└─ 別システム DB への逆同期
スケーリング戦略:
├─ Provisioned: ワーカー数固定(2~10 個)
├─ Autoscaled: CPU / MBps に応じて MCU で自動スケール
└─ Topic 別パーティション: コネクター数 = パーティション数が効率的
コアコンポーネント
1. Custom Plugin(カスタムプラグイン)
Debezium・Confluent・Aiven などのオープンソースコネクターを S3 にアップロードし、カスタムプラグインとして登録。
# ステップ1: Debezium MySQL Connector JAR を S3 に配置
# https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.4.0.Final/
# debezium-connector-mysql-2.4.0.Final-plugin.tar.gz をダウンロード
aws s3 cp debezium-connector-mysql-2.4.0.Final-plugin.tar.gz \
s3://my-msk-connect-plugins/debezium/mysql/
# ステップ2: カスタムプラグイン登録
aws kafkaconnect create-custom-plugin \
--name debezium-mysql-source \
--content-type ZIP \
--location '{
"s3Location": {
"bucketArn": "arn:aws:s3:::my-msk-connect-plugins",
"fileKey": "debezium/mysql/debezium-connector-mysql-2.4.0.Final-plugin.tar.gz",
"objectVersion": "v1"
}
}' \
--description "Debezium MySQL CDC Connector v2.4.0"
# ステップ3: プラグイン ARN を取得
aws kafkaconnect describe-custom-plugin \
--custom-plugin-arn arn:aws:kafkaconnect:ap-northeast-1:123456789012:custom-plugin/debezium-mysql-source/...
2. Worker Configuration(ワーカー設定)
MSK Connect ワーカーのリソース・ログ・監視設定を定義。複数コネクターで共有可能。
# ワーカー設定作成
aws kafkaconnect create-worker-configuration \
--name msk-connect-worker-config \
--properties-file-content '
# ワーカー設定
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# JVM メモリ設定(重要)
CONNECT_HEAP_OPTS=-Xmx2G -Xms2G
# タスク設定
tasks.max=10
offset.storage.topic=msk-connect-offsets
offset.storage.replication.factor=3
config.storage.topic=msk-connect-config
config.storage.replication.factor=3
status.storage.topic=msk-connect-status
status.storage.replication.factor=3
# 実行プロパティ
connector.client.config.override.policy=All
# ログ設定(CloudWatch に転送)
log4j.rootLogger=INFO, stdout, appender
' \
--description "本番 MSK Connect ワーカー設定"
3. Source Connector(ソースコネクター)
3.1 Debezium MySQL CDC
# MySQL 変更データキャプチャ(Change Data Capture)
aws kafkaconnect create-connector \
--connector-name mysql-cdc-source \
--connector-configuration '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"name": "mysql-cdc-source",
"tasks.max": "3",
# DB 接続
"database.hostname": "mydb.c9akciq32.us-east-1.rds.amazonaws.com",
"database.port": "3306",
"database.user": "${secretManager:mysql-creds:username}",
"database.password": "${secretManager:mysql-creds:password}",
"database.server.id": "1",
"database.server.name": "mydb-prod",
# テーブル・DB フィルタ
"database.include.list": "orders,customers,inventory",
"table.include.list": "orders.orders,orders.order_items,customers.users,inventory.stock",
"column.include.list": "orders.orders.order_id,orders.orders.customer_id,orders.orders.amount",
# スキーマ変更トラッキング
"include.schema.changes": "true",
"database.history.kafka.bootstrap.servers": "b-1.msk-prod.xxx.ap-northeast-1.kafka.amazonaws.com:9098",
"database.history.kafka.topic": "dbhistory.mydb",
# CDC モード(binlog)
"database.history.kafka.topic": "dbhistory.mydb",
"decimal.handling.mode": "double",
"bigint.unsigned.handling.mode": "long",
# snapshot 設定
"snapshot.mode": "when_needed",
"snapshot.locking.mode": "minimal",
"snapshot.fetch.size": "10000",
# パフォーマンス
"batch.size": "2048",
"poll.interval.ms": "1000"
}' \
--kafka-cluster '{
"apacheKafkaCluster": {
"bootstrapServers": "b-1.msk-prod.xxx.ap-northeast-1.kafka.amazonaws.com:9098,b-2.msk-prod.xxx.ap-northeast-1.kafka.amazonaws.com:9098",
"vpc": {
"subnets": ["subnet-a", "subnet-b", "subnet-c"],
"securityGroups": ["sg-msk-connect"]
}
}
}' \
--kafka-cluster-client-authentication '{
"authenticationType": "IAM"
}' \
--kafka-cluster-encryption-in-transit '{
"encryptionType": "TLS"
}' \
--plugins '[{
"customPlugin": {
"customPluginArn": "arn:aws:kafkaconnect:ap-northeast-1:123456789012:custom-plugin/debezium-mysql-source/..."
}
}]' \
--capacity '{
"autoScaling": {
"minWorkerCount": 2,
"maxWorkerCount": 10,
"mcuCount": 1,
"scaleInPolicy": {
"cpuUtilizationPercentage": 20
},
"scaleOutPolicy": {
"cpuUtilizationPercentage": 80
}
}
}' \
--log-delivery '{
"workerLogDelivery": {
"cloudWatchLogs": {
"enabled": true,
"logGroup": "/aws/msk-connect/mysql-cdc"
},
"s3": {
"enabled": true,
"bucket": "msk-connect-logs",
"prefix": "mysql-cdc/"
}
}
}' \
--worker-configuration '{
"arn": "arn:aws:kafkaconnect:ap-northeast-1:123456789012:worker-configuration/msk-connect-worker-config/..."
}' \
--service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole
3.2 Salesforce Source Connector
# Salesforce データの Kafka へのストリーミング
aws kafkaconnect create-connector \
--connector-name salesforce-source \
--connector-configuration '{
"connector.class": "com.salesforce.kafka.connect.SalesforceSourceConnector",
"name": "salesforce-source",
"tasks.max": "1",
# Salesforce 認証
"salesforce.instance": "my-org.salesforce.com",
"salesforce.client.id": "${secretManager:salesforce-oauth:client_id}",
"salesforce.client.secret": "${secretManager:salesforce-oauth:client_secret}",
"salesforce.username": "${secretManager:salesforce-oauth:username}",
"salesforce.password": "${secretManager:salesforce-oauth:password}",
# Sobject 抽出
"salesforce.objects": "Account,Contact,Opportunity,Lead",
"salesforce.object.account.extraction.query": "SELECT Id,Name,BillingCity,BillingCountry FROM Account WHERE LastModifiedDate > :start_time",
# API 選択
"salesforce.api": "REST",
"salesforce.polling.interval": "3600000"
}' \
--kafka-cluster '...' \
--plugins '[...]' \
--capacity '{
"provisionedCapacity": {
"mcuCount": 1,
"workerCount": 1
}
}' \
--service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole
4. Sink Connector(シンクコネクター)
4.1 S3 Sink Connector(Parquet 形式)
# Kafka トピック → S3 時系列アーカイブ
aws kafkaconnect create-connector \
--connector-name kafka-to-s3-sink \
--connector-configuration '{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"name": "kafka-to-s3-sink",
"tasks.max": "4",
"topics": "orders,customers,inventory,events",
# S3 設定
"s3.region": "ap-northeast-1",
"s3.bucket.name": "kafka-data-lake",
"s3.part.size": "134217728",
"flush.size": "5000",
"rotate.interval.ms": "3600000",
# ストレージ・フォーマット設定
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"parquet.compression.codec": "snappy",
# 時系列パーティショニング
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "year=YYYY/month=MM/day=dd/hour=HH",
"locale": "ja_JP",
"timezone": "Asia/Tokyo",
"partition.duration.ms": "3600000",
# レコード変換
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
# エラー処理
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "kafka-to-s3-dlq",
"errors.log.enable": "true"
}' \
--kafka-cluster '...' \
--plugins '[{
"customPlugin": {
"customPluginArn": "arn:aws:kafkaconnect:ap-northeast-1:123456789012:custom-plugin/s3-sink/..."
}
}]' \
--capacity '{
"autoScaling": {
"minWorkerCount": 2,
"maxWorkerCount": 10,
"mcuCount": 1,
"scaleOutPolicy": {
"cpuUtilizationPercentage": 70,
"throughputUtilizationPercentage": 70
}
}
}' \
--service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole
4.2 OpenSearch Sink Connector
# Kafka イベント → OpenSearch リアルタイムインデックス
aws kafkaconnect create-connector \
--connector-name kafka-to-opensearch-sink \
--connector-configuration '{
"connector.class": "org.opensearch.connectors.kafka.OpenSearchSinkConnector",
"name": "kafka-to-opensearch-sink",
"tasks.max": "2",
"topics": "events,logs,transactions",
# OpenSearch 接続
"connection.url": "https://opensearch-domain.ap-northeast-1.es.amazonaws.com:9200",
"connection.username": "${secretManager:opensearch-creds:username}",
"connection.password": "${secretManager:opensearch-creds:password}",
# インデックス設定
"index_name": "kafka-events",
"type_name": "_doc",
"write_method": "upsert",
"batch.size": "1000",
"linger.ms": "5000",
# インデックス ローテーション
"behavior.on.null.values": "ignore",
"date_format": "yyyy.MM.dd",
"index_prefix": "kafka-",
"index_partition_size": "1000000"
}' \
--kafka-cluster '...' \
--plugins '[...]' \
--capacity '{
"provisionedCapacity": {
"mcuCount": 2,
"workerCount": 2
}
}' \
--service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole
ユースケース実装
UC1: リアルタイム CDC パイプライン(MySQL → S3 → Redshift)
import boto3
import json
from datetime import datetime
# MSK Connect 管理
kconnect = boto3.client('kafkaconnect', region_name='ap-northeast-1')
cloudwatch = boto3.client('cloudwatch')
def setup_cdc_pipeline():
"""
MySQL の変更イベントをリアルタイムに S3・Redshift に同期
1. Debezium MySQL CDC: MySQL 変更 → Kafka
2. S3 Sink: Kafka → S3 Parquet(バックアップ・監査ログ)
3. Redshift: Kafka → Redshift(ウェアハウス同期)
"""
# ステップ1: MySQL CDC コネクター作成(既上述)
mysql_connector = kconnect.create_connector(
connectorName='mysql-cdc',
connectorConfiguration={
'connector.class': 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname': 'mydb.rds.amazonaws.com',
'database.port': '3306',
'database.include.list': 'orders,customers',
'database.server.id': '1',
'database.server.name': 'mydb',
# ...
},
# ...
)
print(f"MySQL CDC Connector Created: {mysql_connector['connectorArn']}")
# ステップ2: S3 Sink コネクター作成
s3_sink = kconnect.create_connector(
connectorName='kafka-to-s3',
connectorConfiguration={
'connector.class': 'io.confluent.connect.s3.S3SinkConnector',
'topics': 'mydb.orders,mydb.customers',
's3.bucket.name': 'kafka-data-lake',
's3.part.size': '134217728',
'flush.size': '5000',
# ...
},
# ...
)
print(f"S3 Sink Created: {s3_sink['connectorArn']}")
# ステップ3: Redshift COPY ジョブ(S3 → Redshift)
# Lambda で CloudWatch イベント(S3 PutObject)をトリガー
# → Redshift COPY コマンドを実行
print("CDC Pipeline Setup Complete")
def monitor_connector_health():
"""コネクター健全性監視"""
# コネクター状態確認
response = kconnect.describe_connector(
connectorArn='arn:aws:kafkaconnect:ap-northeast-1:123456789012:connector/mysql-cdc/...'
)
status = response['connectorStatus']['currentStatus']
print(f"Connector Status: {status}")
if status != 'RUNNING':
# CloudWatch に警告
cloudwatch.put_metric_alarm(
AlarmName='MSKConnectConnectorDown',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='ConnectorFailureCount',
Namespace='AWS/Kafka',
Period=300,
Statistic='Sum',
Threshold=0,
ActionsEnabled=True,
AlarmActions=['arn:aws:sns:ap-northeast-1:123456789012:alerts']
)
setup_cdc_pipeline()
monitor_connector_health()
UC2: ログ集約パイプライン(Datadog → Kafka → S3)
# Datadog から Kafka へログをストリーミング
aws kafkaconnect create-connector \
--connector-name datadog-to-kafka \
--connector-configuration '{
"connector.class": "com.datadoghq.kafka.connect.logs.DatadogLogsSourceConnector",
"name": "datadog-logs-source",
"tasks.max": "1",
"datadog.api.url": "https://api.datadoghq.jp/api/v1",
"datadog.api.key": "${secretManager:datadog-api:key}",
"datadog.app.key": "${secretManager:datadog-api:app_key}",
"datadog.query": "status:error",
"datadog.batch.size": "100",
"kafka.topic": "datadog-logs"
}' \
--kafka-cluster '...' \
--plugins '[...]' \
--service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole
Auto Scaling 設定
MSK Connect のワーカーを CPU・MBps に応じて自動スケール。
def configure_autoscaling():
"""
CPU 利用率 80% 以上 → スケールアウト(ワーカー追加)
CPU 利用率 20% 以下 → スケールイン(ワーカー削減)
"""
kconnect = boto3.client('kafkaconnect')
# コネクター更新(Auto Scaling 有効化)
response = kconnect.update_connector(
connectorArn='arn:aws:kafkaconnect:ap-northeast-1:123456789012:connector/mysql-cdc/...',
capacity={
'autoScaling': {
'minWorkerCount': 2,
'maxWorkerCount': 10,
'mcuCount': 1,
'scaleOutPolicy': {
'cpuUtilizationPercentage': 80,
'throughputUtilizationPercentage': 70 # MBps 利用率
},
'scaleInPolicy': {
'cpuUtilizationPercentage': 20
}
}
}
)
print(f"Auto Scaling Configured: {response['connectorArn']}")
configure_autoscaling()
競合他社との比較
| 観点 | MSK Connect | Confluent Cloud | Striim | Debezium | Aiven Kafka Connect |
|---|---|---|---|---|---|
| ホスティング | AWS マネージド | SaaS | オンプレ / SaaS | OSS(自前構築) | クラウド |
| ワーカー管理 | 全自動 | 全自動 | 手動 | 手動 | 全自動 |
| コネクター数 | 100+ | 150+ | 50+ | 150+ | 100+ |
| CDC(Debezium) | ネイティブ対応 | 統合 | ネイティブ | 本家 | ネイティブ対応 |
| AWS 統合 | ネイティブ | API 連携 | API 連携 | API 連携 | API 連携 |
| IAM 認証 | ネイティブ対応 | API キー | 別途認証 | 別途認証 | サポート |
| Auto Scaling | CPU / MBps ベース | 自動 | 手動 | N/A | Auto Scaling |
| コスト | $0.11/MCU-時間 | $1/パーティション-時間 | 固定 | 無料(インフラ費用) | $0.19/MCU-時間 |
コスト詳細
課金モデル
1. MCU(MSK Connect Unit):
- $0.11/MCU-時間
- 1 MCU = メモリ 512MB + CPU 0.25 コア(概算)
2. 例: 2 MCU × 24h × 30 日
- $0.11 × 2 × 720 = $158.4/月
3. ストレージ(コネクター設定):
- $0.10/GB/月(通常 < 1GB)
4. AWS インフラ(S3・Kafka 別途):
- S3: $0.023/GB/月
- MSK: ブローカーコスト + ストレージ
コスト最適化
def optimize_msk_connect_cost():
"""
1. Provisioned Capacity: 固定 MCU(安定スループット向け)
2. Auto Scaling: 変動スループット向け(バースト対応)
3. ワーカー数最小化: パーティション数に最適化
"""
# 現在コスト: 3 MCU × 24 × 30 × $0.11 = $237.6/月
# 最適化後: Auto Scaling min=1 MCU, max=5 MCU(平均 2 MCU)
# → 推定: 2 MCU × 24 × 30 × $0.11 = $158.4/月($79.2/月削減)
estimated_monthly = 2 * 24 * 30 * 0.11
print(f"Optimized Monthly Cost: ${estimated_monthly:.2f}")
optimize_msk_connect_cost()
ベストプラクティス
BP1: コネクター設計
- タスク数 = パーティション数 - パーティション並列処理で最大スループット
- Debezium snapshot モード -
when_neededで初回スナップショットのみ実行 - バッチサイズ調整 - small(256)vs large(2048)でレイテンシ ↔ スループット トレード
BP2: 監視・アラート
# CloudWatch メトリクス監視
cloudwatch.put_metric_alarm(
AlarmName='MSKConnectLag',
MetricName='BatchProcessTime',
Namespace='AWS/Kafka',
Statistic='Average',
Period=300,
Threshold=5000, # ms
ComparisonOperator='GreaterThanThreshold'
)
BP3: セキュリティ
- IAM 認証: SASL_SSL + IAM(推奨)
- Secrets Manager: DB パスワード・API キーを保護
- VPC エンドポイント: Kafka Broker との通信を VPC 内に隔離
トラブルシューティング
| 症状 | 原因 | 対応 |
|---|---|---|
| コネクター FAILED | DB 接続失敗 / DB 권한不足 | セキュリティグループ確認、DB ユーザー権限確認 |
| レイテンシ増加 | ワーカー CPU 枯渇 / パーティション遍在 | MCU 増加、パーティション数最適化 |
| メッセージロス | Kafka ディスク満杯 / ワーカー クラッシュ | EBS Auto-scaling 有効化、ワーカー再起動 |
| Debezium snapshot 遅延 | 大型テーブルの初回同期 | batch.size 最適化、並列スナップショット数調整 |
| Provider Connector 非対応 | プラグイン不存在 | Confluent Hub で検索、カスタムプラグイン開発 |
採用判断チェックリスト
- [ ] Kafka トピックから外部システム(S3 / DWH / OpenSearch)へのリアルタイム同期が必要?
- [ ] Debezium CDC(MySQL / PostgreSQL)を使用予定?
- [ ] Kafka Connect インフラ管理(ワーカー・パッチ)を自動化したい?
- [ ] Confluent Kafka Connect プラグインをそのまま利用したい?
- [ ] Auto Scaling で変動トラフィックに対応したい?
まとめ
Amazon MSK Connect は、Kafka Connect のフルマネージドサービスです。Debezium(MySQL / PostgreSQL CDC)・Confluent・Aiven のコネクターをそのまま利用でき、Kafka から S3・RDS・DynamoDB・OpenSearch・Salesforce への自動連携パイプラインを構築・運用できます。ワーカーの Auto Scaling・パッチ管理を完全に AWS が担当し、IAM 認証・TLS・CloudWatch との統合で企業グレードのセキュリティ・可視性を実現します。
最終更新:2026-04-27 バージョン:v2.0