目次
Amazon MSK(Managed Streaming for Apache Kafka)v2.0 完全ガイド 2026
概要
Amazon MSK(Managed Streaming for Apache Kafka)は、Apache Kafka クラスターをスケーラブルで安全に実行するための AWS 完全マネージドストリーミングサービスです。プロデューサー・コンシューマー・トピック管理をサポートし、Kafka プロトコルネイティブに動作するため、既存の Kafka アプリケーション・ツール・プラグインをコード変更なしに AWS に移行できます。MSK は ZooKeeper または KRaft(Kafka Raft)による統治、自動ブローカーフェイルオーバー、IAM 認証、TLS 暗号化、多様なブローカータイプ(Standard / Express / Serverless)を提供し、リアルタイムメッセージングの本番基盤として機能します。
課題解決
- 既存 Kafka クラスターの運用負荷廃止 - ZooKeeper の複雑な構成管理・監視・スケーリングをすべて AWS がマネージド
- スケーリング判断の自動化 - MSK Serverless でキャパシティプランニング不要、動的スケーリング対応
- 高可用性・災害復旧 - マルチ AZ 自動レプリカ、ブローカーフェイルオーバー、Storage Auto-scaling
- AWS エコシステム統合 - Lambda Event Source Mapping、Kinesis 連携、Glue スキーマレジストリ、MSK Connect(Kafka Connect マネージド)
アーキテクチャ図
graph TB
subgraph Producer["プロデューサー層"]
EC2["EC2 / Lambda"]
App["カスタムアプリ"]
IoT["IoT デバイス"]
end
subgraph MSKCluster["MSK クラスター<br/>VPC-Native"]
BrokerA["Broker-A<br/>AZ-a"]
BrokerB["Broker-B<br/>AZ-b"]
BrokerC["Broker-C<br/>AZ-c"]
Controllers["KRaft Controllers"]
end
subgraph Consumer["コンシューマー層"]
Lambda["Lambda<br/>Event Source"]
Flink["Managed Flink"]
KCL["KCL App"]
Connect["MSK Connect"]
end
subgraph Destinations["配信先"]
S3["S3"]
Redshift["Redshift"]
OpenSearch["OpenSearch"]
DB["RDS / DynamoDB"]
end
Producer -->|Kafka API<br/>Port 9092/9094| MSKCluster
MSKCluster -->|Replication<br/>factor=3| MSKCluster
MSKCluster -->|GetRecords| Consumer
Connect --> Destinations
Flink --> Destinations
style MSKCluster fill:#FF9900,color:#fff
style Controllers fill:#FFA500
コアコンポーネント
1. ブローカータイプ(MSK Provisioned)
| ブローカータイプ | vCPU | メモリ | ネットワーク | EBS スループット | 用途 |
|---|---|---|---|---|---|
| kafka.m5.large | 2 | 8GB | 最大 10Gbps | 165 MB/s | 中規模・デフォルト |
| kafka.m5.xlarge | 4 | 16GB | 最大 10Gbps | 330 MB/s | 大規模本番 |
| kafka.m5.2xlarge | 8 | 32GB | 最大 10Gbps | 660 MB/s | 超大規模 |
| kafka.t3.small | 2 | 2GB | 最大 5Gbps | 125 MB/s | 開発・テスト |
| kafka.c5.large | 2 | 4GB | 最大 10Gbps | 165 MB/s | CPU 最適化 |
| kafka-express.m6i.large | 2 | 8GB | 最大 10Gbps | 1000+ MB/s | Express(新) |
2. クラスターモード
MSK Provisioned Standard
- ブローカーを事前指定(3~30 個)
- ZooKeeper または KRaft(Kafka 3.4+)で統治
- 柔軟なコスト管理・スケーリング制御
- 推奨: 100KB~1MB メッセージ・安定したスループット
MSK Provisioned Express
- 2026 新機能: Standard ブローカーの 5~10 倍高速化
- より低いレイテンシー(~10ms)
- 高スループット・ホットパーティション耐性向上
- コスト: Standard より 30~40% 高い代わりに性能重視
- 推奨: 金融取引・リアルタイム分析・IoT ストリーム
MSK Serverless
- ブローカー数・容量を自動管理(スケール 0~無制限)
- 直近 30 日ピーク × 2 を自動上限設定
- 従量課金: GB 単位($0.075/パーティション-時間+$0.10/$0.05 per GB)
- ZooKeeper 不要(KRaft のみ)
- 推奨: 不規則なトラフィック・スタートアップ・バースト対応
3. 認証・暗号化
IAM 認証(推奨)
# Broker エンドポイント
bootstrap.servers=b-1.prod-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098
bootstrap.servers=b-2.prod-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098
# プロトコル: SASL_SSL(TLS + IAM)
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauth.token.endpoint.url=https://iam.amazonaws.com/
SASL/SCRAM(レガシー)
- 平文 SCRAM は推奨されない(IAM auth より劣る)
- 既存オンプレ移行時のみ採用検討
mTLS(相互 TLS)
- クライアント証明書ベース認証
- 高セキュリティ環境向け(金融・医療)
4. KRaft モード(ZooKeeper 廃止)
Kafka 3.4+ では KRaft がデフォルト:
- Controller node: クラスタメタデータ管理(内部)
- Broker node: トピック・パーティション管理(ユーザー向け)
利点:
✅ ZooKeeper 消失に伴う管理コスト削減
✅ Controller → Broker の直接通信(遅延削減)
✅ 最大 60 ブローカー(ZK は 30)
✅ 2-3 回のメタデータ同期で高速フェイルオーバー
主要ユースケース
- マイクロサービスイベントバス - Order → Payment → Shipping の非同期メッセージング
- リアルタイムログ集約 - アプリログ → MSK → S3/ELK スタック
- CDC パイプライン - Debezium → MSK → データウェアハウス
- ストリーム分析 - IoT センサー → MSK → Managed Flink → ダッシュボード
- オンプレミス Kafka 移行 - 既存 Kafka クラスタをコード変更なしで AWS へ
- リアルタイムレコメンデーション - ユーザー行動 → MSK → ML モデル → Web API
- 金融トランザクション処理 - 決済イベント → MSK → リスク検知・不正検知
- ゲーム実況・チャット - プレイヤーアクション → MSK → リアルタイム同期
- 複数リージョン複製 - MSK Replicator で災害対応・ジオロケーション最適化
- Schema Registry 統合 - メッセージスキーマの一元管理・互換性検査
- MSK Connect コネクター - S3/RDS/DynamoDB への自動シンク
- Kafka Streams・KSQL - MSK 上でのストリーム処理アプリ実行
設定・操作の具体例
CLI: MSK Provisioned クラスター作成
# 基本的な 6 ブローカークラスター(3AZ × 2)
aws kafka create-cluster \
--cluster-name production-msk \
--kafka-version 3.7.0 \
--number-of-broker-nodes 6 \
--broker-node-group-info '{
"InstanceType": "kafka.m5.large",
"ClientSubnets": [
"subnet-12345678-az-a",
"subnet-87654321-az-b",
"subnet-abcdef00-az-c"
],
"SecurityGroups": ["sg-0123456789abcdef0"],
"StorageInfo": {
"EbsStorageInfo": {
"VolumeSize": 500,
"ProvisionedThroughput": {
"Enabled": true,
"VolumeThroughput": 250
}
}
}
}' \
--encryption-info '{
"EncryptionInTransit": {
"ClientBroker": "TLS",
"InCluster": true
},
"EncryptionAtRest": {
"DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:123456789012:key/12345678-1234-1234-1234-123456789012"
}
}' \
--client-authentication '{
"Sasl": {
"Iam": {"Enabled": true},
"Scram": {"Enabled": false}
},
"Tls": {"Enabled": false}
}' \
--enhanced-monitoring PER_TOPIC_PER_BROKER \
--logging-info '{
"BrokerLogs": {
"CloudWatchLogs": {
"Enabled": true,
"LogGroup": "/aws/msk/production"
},
"S3": {
"Enabled": true,
"Bucket": "my-msk-logs",
"Prefix": "broker-logs/"
},
"Firehose": {
"Enabled": false
}
}
}' \
--tags Key=Environment,Value=Production Key=Owner,Value=DataPlatform
SDK (Python): プロデューサー・コンシューマー
from kafka import KafkaProducer, KafkaConsumer
import json
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
# IAM トークンプロバイダー初期化
token_provider = MSKAuthTokenProvider(region='ap-northeast-1')
# プロデューサー(IAM SASL_SSL)
producer = KafkaProducer(
bootstrap_servers=[
'b-1.production-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098',
'b-2.production-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098'
],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=token_provider,
acks='all',
retries=3,
compression_type='snappy'
)
# メッセージ送信(パーティションキーで順序保証)
order_data = {
'order_id': 'ORD-20260427-001',
'customer_id': 'CUST-12345',
'amount': 15000,
'timestamp': '2026-04-27T10:30:00Z'
}
producer.send(
'orders',
key=order_data['customer_id'].encode('utf-8'),
value=order_data,
partition=0 # 明示的なパーティション指定
)
producer.flush()
print("Order sent to MSK topic: orders")
# コンシューマー(グループベース)
consumer = KafkaConsumer(
'orders',
bootstrap_servers=[
'b-1.production-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098'
],
group_id='order-processor-group',
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=token_provider,
max_poll_records=100,
session_timeout_ms=30000
)
for message in consumer:
order = message.value
print(f"Processing order: {order['order_id']} for ${order['amount']}")
# リアルタイム処理(決済・配送)
IaC (Terraform): MSK Serverless
# MSK Serverless クラスター
resource "aws_msk_serverless_cluster" "example" {
cluster_name = "serverless-kafka-cluster"
vpc_config {
subnet_ids = ["subnet-12345678", "subnet-87654321"]
security_group_ids = ["sg-0123456789abcdef0"]
}
client_authentication {
sasl {
iam = true
}
}
tags = {
Environment = "Production"
Tier = "Streaming"
}
}
# CloudWatch アラーム
resource "aws_cloudwatch_metric_alarm" "serverless_throttle" {
alarm_name = "MSK-Serverless-Throttle"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "ThrottledProduceRequestCount"
namespace = "AWS/Kafka"
period = "300"
statistic = "Sum"
threshold = "10"
alarm_actions = ["arn:aws:sns:ap-northeast-1:123456789012:alerts"]
}
CloudFormation: MSK Provisioned + KRaft
Resources:
MSKCluster:
Type: AWS::MSK::Cluster
Properties:
BrokerNodeGroupInfo:
InstanceType: kafka.m5.large
ClientSubnets:
- subnet-12345678
- subnet-87654321
- subnet-abcdef00
SecurityGroups:
- sg-0123456789abcdef0
StorageInfo:
EBSStorageInfo:
VolumeSize: 500
ProvisionedThroughput:
Enabled: true
VolumeThroughput: 250
ClusterName: prod-msk-kraft
KafkaVersion: "3.7.0"
NumberOfBrokerNodes: 6
EncryptionInfo:
EncryptionInTransit:
ClientBroker: TLS
InCluster: true
EncryptionAtRest:
DataVolumeKMSKeyId: !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/12345678-1234-1234-1234-123456789012"
ClientAuthentication:
Sasl:
Iam: true
Scram: false
EnhancedMonitoring: PER_TOPIC_PER_BROKER
LoggingInfo:
BrokerLogs:
CloudWatchLogs:
Enabled: true
LogGroup: /aws/msk/production
S3:
Enabled: true
Bucket: my-msk-logs
Prefix: broker-logs/
LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/msk/production
RetentionInDays: 30
Outputs:
ClusterArn:
Value: !GetAtt MSKCluster.Arn
BootstrapBrokers:
Value: !GetAtt MSKCluster.BootstrapBrokerString
類似サービス比較表
| 観点 | Amazon MSK | Apache Kafka Self-managed |
Confluent Cloud | Redpanda | Azure Event Hubs |
|---|---|---|---|---|---|
| ホスティング | AWS マネージド | オンプレ/EC2 | SaaS | Self-managed | Azure マネージド |
| Kafka API | ネイティブ | ネイティブ | ネイティブ | Kafka 互換 | AMQP / Kafka API |
| スケーリング | 自動/手動 | 手動 | 自動 | 手動 | 自動 |
| 監視・運用 | CloudWatch | 自前 | 内蔵(Confluent Control Center) | 内蔵 | 内蔵 |
| Schema Registry | AWS Glue | 別途構築 | 内蔵 | 別途 | 別途 |
| KRaft サポート | GA(3.4+) | GA(3.3+) | GA | ネイティブ | N/A |
| コスト透明度 | 高(AWS 従量課金) | 固定 | ドル/パーティション-時間 | 固定 | スループット単位 |
| マルチリージョン | MSK Replicator | Kafka Mirror Maker | 内蔵 | 別途構築 | 内蔵 |
ベストプラクティス
✅ 推奨
- IAM 認証を使用 - SASL/SCRAM より安全・管理容易
- レプリケーション 3 設定 - min.insync.replicas=2 + acks=all で無損失
- マルチ AZ デプロイ - 各 AZ に 1 ブローカー以上配置(3AZ 推奨)
- Storage Auto-scaling - EBS スケーリングを有効化(ディスク満杯回避)
- パーティション設計 - パーティション数 = 消費並列度 × 3 を目安
- CloudWatch & ログ監視 - ブローカーログを CloudWatch / S3 に転送
- 定期バージョンアップ - Kafka セキュリティパッチを 3ヶ月ごと適用
- KRaft モード採用 - Kafka 3.4+ では KRaft がデフォルト(ZK 廃止予定)
- MSK Connect 活用 - S3/RDS へのシンク・ソース連携を自動化
- Metrics 監視 - BytesInPerSec, BytesOutPerSec, FetchConsumerTotalTimeMs を監視
❌ アンチパターン
- パーティションキー偏り - 特定キーが大量メッセージ生成(ホットシャード)
- Default パーティション数 12 のまま - 消費者数が 20 個でスループット枯渇
- ZooKeeper クラスターの不安定化 - ZK のディスク満杯・GC 暴走
- 圧縮なしの大量メッセージ - ネットワーク帯域浪費
- ブローカーログ無効化 - トラブル時のデバッグ困難
- IAM ロール権限を全許可 - 最小権限ルール無視
- Serverless で低メッセージレート - バースト対応向けなので常時高スループット不要
トラブルシューティング表
| 症状 | 原因 | 対応 |
|---|---|---|
| Connection timeout: xxx:9098 | ブローカーエンドポイント誤り / セキュリティグループ遮断 | aws kafka get-bootstrap-brokers で確認、SG 確認 |
SASL authentication failed |
IAM ロール権限不足 / トークン期限切れ | IAM ロールに kafka-cluster:* 権限追加、トークン再生成 |
Topic replica out-of-sync |
ブローカーディスク満杯 / メモリ不足 | EBS Volume 拡張、ブローカー再起動、ホットシャード分割 |
| Consumer lag 増加 | コンシューマー遅延 / GC 暴走 | コンシューマーの parallelization 向上、ヒープ増加 |
Broker down / Unhealthy |
ハードウェア障害 / ZK メタデータ破損 | AWS がフェイルオーバー対応、KRaft モード移行検討 |
| Throughput 上限に達した | シャード容量枯渇 | ブローカー数増加(SplitShard)、Express ブローカー検討 |
Message loss |
acks=0 設定 / min.insync.replicas=1 | acks=‘all’、min.insync.replicas=2 に設定 |
2025-2026 最新動向
- MSK Express GA(2026-04) - Standard の 5~10 倍スループット、金融・リアルタイム分析向け
- KRaft デフォルト - Kafka 3.7 では KRaft が推奨、ZooKeeper 廃止予定
- Enhanced Monitoring - より詳細なメトリクス(Topic/Broker 単位)
- MSK Connect 拡張 - 新しいコネクタ増加(DynamoDB、Neptune など)
- IAM 認証成熟化 - レガシー SASL/SCRAM の非推奨化傾向
- On-Demand スケーリング - Serverless の動的キャパシティ調整改善
- Lambda Event Source Mapping - ポーリング改善、より低レイテンシー
学習リソース・参考文献
公式ドキュメント
- AWS MSK Developer Guide
- MSK Supported Kafka Versions
- MSK FAQs
- MSK Pricing
- MSK Features
- How to Choose the Right MSK Cluster Type
- What is MSK Serverless
- MSK Security Best Practices
オープンソース・ベンダー資料
- Apache Kafka Official Documentation
- Confluent Platform Documentation
- Redpanda Documentation
- Aiven Kafka Documentation
- AWS MSK Terraform Provider
実装例・チェックリスト
実装例: Event Sourcing パターン
# MSK を Event Store として活用
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime
class EventStore:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
compression_type='snappy'
)
self.event_topic = 'order-events'
def publish_event(self, aggregate_id, event_type, payload):
event = {
'event_id': str(uuid.uuid4()),
'aggregate_id': aggregate_id,
'event_type': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload,
'version': 1
}
self.producer.send(
self.event_topic,
key=aggregate_id.encode('utf-8'),
value=event
)
self.producer.flush()
return event['event_id']
# 使用例
store = EventStore(['b-1.xxx:9098', 'b-2.xxx:9098'])
store.publish_event(
'ORDER-12345',
'OrderCreated',
{'amount': 50000, 'customer': 'CUST-99'}
)
チェックリスト
- [ ] ブローカーサイズ・数は消費スループット想定値の 3 倍?
- [ ] 各 AZ に最低 1 ブローカー配置?
- [ ] IAM 認証有効化?
- [ ] TLS 暗号化(in-transit & at-rest)有効化?
- [ ] レプリケーション 3、min.insync.replicas=2?
- [ ] ブローカーログ(CloudWatch / S3)有効化?
- [ ] CloudWatch アラーム設定(レプリカ遅延、ディスク、CPU)?
- [ ] 定期バージョンアップ計画立案?
- [ ] MSK Replicator で DR 構成検討?
- [ ] Lambda Event Source Mapping テスト実施?
まとめ
Amazon MSK は Kafka クラスター運用を完全に AWS に委託し、プロデューサー・コンシューマー・トピック管理に専念できるマネージドサービスです。MSK Provisioned(Standard/Express)でキャパシティ制御、MSK Serverless で自動スケーリングを実現し、IAM 認証・TLS・KRaft・自動フェイルオーバー・Storage Auto-scaling により本番グレードのセキュリティ・可用性を備えています。既存 Kafka クラスターからのコード変更なし移行、マイクロサービス・ログ集約・CDC・リアルタイム分析の中核基盤として、Lambda / Flink / Glue との統合で AWS ストリーミングエコシステムの中心的役割を果たします。
最終更新:2026-04-27
バージョン:v2.0