目次

Amazon MSK(Managed Streaming for Apache Kafka)v2.0 完全ガイド 2026

概要

Amazon MSK(Managed Streaming for Apache Kafka)は、Apache Kafka クラスターをスケーラブルで安全に実行するための AWS 完全マネージドストリーミングサービスです。プロデューサー・コンシューマー・トピック管理をサポートし、Kafka プロトコルネイティブに動作するため、既存の Kafka アプリケーション・ツール・プラグインをコード変更なしに AWS に移行できます。MSK は ZooKeeper または KRaft(Kafka Raft)による統治、自動ブローカーフェイルオーバー、IAM 認証、TLS 暗号化、多様なブローカータイプ(Standard / Express / Serverless)を提供し、リアルタイムメッセージングの本番基盤として機能します。

課題解決

  • 既存 Kafka クラスターの運用負荷廃止 - ZooKeeper の複雑な構成管理・監視・スケーリングをすべて AWS がマネージド
  • スケーリング判断の自動化 - MSK Serverless でキャパシティプランニング不要、動的スケーリング対応
  • 高可用性・災害復旧 - マルチ AZ 自動レプリカ、ブローカーフェイルオーバー、Storage Auto-scaling
  • AWS エコシステム統合 - Lambda Event Source Mapping、Kinesis 連携、Glue スキーマレジストリ、MSK Connect(Kafka Connect マネージド)

アーキテクチャ図

graph TB
    subgraph Producer["プロデューサー層"]
        EC2["EC2 / Lambda"]
        App["カスタムアプリ"]
        IoT["IoT デバイス"]
    end
    
    subgraph MSKCluster["MSK クラスター<br/>VPC-Native"]
        BrokerA["Broker-A<br/>AZ-a"]
        BrokerB["Broker-B<br/>AZ-b"]
        BrokerC["Broker-C<br/>AZ-c"]
        Controllers["KRaft Controllers"]
    end
    
    subgraph Consumer["コンシューマー層"]
        Lambda["Lambda<br/>Event Source"]
        Flink["Managed Flink"]
        KCL["KCL App"]
        Connect["MSK Connect"]
    end
    
    subgraph Destinations["配信先"]
        S3["S3"]
        Redshift["Redshift"]
        OpenSearch["OpenSearch"]
        DB["RDS / DynamoDB"]
    end
    
    Producer -->|Kafka API<br/>Port 9092/9094| MSKCluster
    MSKCluster -->|Replication<br/>factor=3| MSKCluster
    MSKCluster -->|GetRecords| Consumer
    Connect --> Destinations
    Flink --> Destinations
    
    style MSKCluster fill:#FF9900,color:#fff
    style Controllers fill:#FFA500

コアコンポーネント

1. ブローカータイプ(MSK Provisioned)

ブローカータイプ vCPU メモリ ネットワーク EBS スループット 用途
kafka.m5.large 2 8GB 最大 10Gbps 165 MB/s 中規模・デフォルト
kafka.m5.xlarge 4 16GB 最大 10Gbps 330 MB/s 大規模本番
kafka.m5.2xlarge 8 32GB 最大 10Gbps 660 MB/s 超大規模
kafka.t3.small 2 2GB 最大 5Gbps 125 MB/s 開発・テスト
kafka.c5.large 2 4GB 最大 10Gbps 165 MB/s CPU 最適化
kafka-express.m6i.large 2 8GB 最大 10Gbps 1000+ MB/s Express(新)

2. クラスターモード

MSK Provisioned Standard

  • ブローカーを事前指定(3~30 個)
  • ZooKeeper または KRaft(Kafka 3.4+)で統治
  • 柔軟なコスト管理・スケーリング制御
  • 推奨: 100KB~1MB メッセージ・安定したスループット

MSK Provisioned Express

  • 2026 新機能: Standard ブローカーの 5~10 倍高速化
  • より低いレイテンシー(~10ms)
  • 高スループット・ホットパーティション耐性向上
  • コスト: Standard より 30~40% 高い代わりに性能重視
  • 推奨: 金融取引・リアルタイム分析・IoT ストリーム

MSK Serverless

  • ブローカー数・容量を自動管理(スケール 0~無制限)
  • 直近 30 日ピーク × 2 を自動上限設定
  • 従量課金: GB 単位($0.075/パーティション-時間+$0.10/$0.05 per GB)
  • ZooKeeper 不要(KRaft のみ)
  • 推奨: 不規則なトラフィック・スタートアップ・バースト対応

3. 認証・暗号化

IAM 認証(推奨)

# Broker エンドポイント
bootstrap.servers=b-1.prod-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098
bootstrap.servers=b-2.prod-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098

# プロトコル: SASL_SSL(TLS + IAM)
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauth.token.endpoint.url=https://iam.amazonaws.com/

SASL/SCRAM(レガシー)

  • 平文 SCRAM は推奨されない(IAM auth より劣る)
  • 既存オンプレ移行時のみ採用検討

mTLS(相互 TLS)

  • クライアント証明書ベース認証
  • 高セキュリティ環境向け(金融・医療)

4. KRaft モード(ZooKeeper 廃止)

Kafka 3.4+ では KRaft がデフォルト:
- Controller node: クラスタメタデータ管理(内部)
- Broker node: トピック・パーティション管理(ユーザー向け)

利点:
✅ ZooKeeper 消失に伴う管理コスト削減
✅ Controller → Broker の直接通信(遅延削減)
✅ 最大 60 ブローカー(ZK は 30)
✅ 2-3 回のメタデータ同期で高速フェイルオーバー

主要ユースケース

  1. マイクロサービスイベントバス - Order → Payment → Shipping の非同期メッセージング
  2. リアルタイムログ集約 - アプリログ → MSK → S3/ELK スタック
  3. CDC パイプライン - Debezium → MSK → データウェアハウス
  4. ストリーム分析 - IoT センサー → MSK → Managed Flink → ダッシュボード
  5. オンプレミス Kafka 移行 - 既存 Kafka クラスタをコード変更なしで AWS へ
  6. リアルタイムレコメンデーション - ユーザー行動 → MSK → ML モデル → Web API
  7. 金融トランザクション処理 - 決済イベント → MSK → リスク検知・不正検知
  8. ゲーム実況・チャット - プレイヤーアクション → MSK → リアルタイム同期
  9. 複数リージョン複製 - MSK Replicator で災害対応・ジオロケーション最適化
  10. Schema Registry 統合 - メッセージスキーマの一元管理・互換性検査
  11. MSK Connect コネクター - S3/RDS/DynamoDB への自動シンク
  12. Kafka Streams・KSQL - MSK 上でのストリーム処理アプリ実行

設定・操作の具体例

CLI: MSK Provisioned クラスター作成

# 基本的な 6 ブローカークラスター(3AZ × 2)
aws kafka create-cluster \
  --cluster-name production-msk \
  --kafka-version 3.7.0 \
  --number-of-broker-nodes 6 \
  --broker-node-group-info '{
    "InstanceType": "kafka.m5.large",
    "ClientSubnets": [
      "subnet-12345678-az-a",
      "subnet-87654321-az-b",
      "subnet-abcdef00-az-c"
    ],
    "SecurityGroups": ["sg-0123456789abcdef0"],
    "StorageInfo": {
      "EbsStorageInfo": {
        "VolumeSize": 500,
        "ProvisionedThroughput": {
          "Enabled": true,
          "VolumeThroughput": 250
        }
      }
    }
  }' \
  --encryption-info '{
    "EncryptionInTransit": {
      "ClientBroker": "TLS",
      "InCluster": true
    },
    "EncryptionAtRest": {
      "DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:123456789012:key/12345678-1234-1234-1234-123456789012"
    }
  }' \
  --client-authentication '{
    "Sasl": {
      "Iam": {"Enabled": true},
      "Scram": {"Enabled": false}
    },
    "Tls": {"Enabled": false}
  }' \
  --enhanced-monitoring PER_TOPIC_PER_BROKER \
  --logging-info '{
    "BrokerLogs": {
      "CloudWatchLogs": {
        "Enabled": true,
        "LogGroup": "/aws/msk/production"
      },
      "S3": {
        "Enabled": true,
        "Bucket": "my-msk-logs",
        "Prefix": "broker-logs/"
      },
      "Firehose": {
        "Enabled": false
      }
    }
  }' \
  --tags Key=Environment,Value=Production Key=Owner,Value=DataPlatform

SDK (Python): プロデューサー・コンシューマー

from kafka import KafkaProducer, KafkaConsumer
import json
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# IAM トークンプロバイダー初期化
token_provider = MSKAuthTokenProvider(region='ap-northeast-1')

# プロデューサー(IAM SASL_SSL)
producer = KafkaProducer(
    bootstrap_servers=[
        'b-1.production-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098',
        'b-2.production-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098'
    ],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=token_provider,
    acks='all',
    retries=3,
    compression_type='snappy'
)

# メッセージ送信(パーティションキーで順序保証)
order_data = {
    'order_id': 'ORD-20260427-001',
    'customer_id': 'CUST-12345',
    'amount': 15000,
    'timestamp': '2026-04-27T10:30:00Z'
}
producer.send(
    'orders',
    key=order_data['customer_id'].encode('utf-8'),
    value=order_data,
    partition=0  # 明示的なパーティション指定
)
producer.flush()
print("Order sent to MSK topic: orders")

# コンシューマー(グループベース)
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=[
        'b-1.production-msk.xxx.c3.kafka.ap-northeast-1.amazonaws.com:9098'
    ],
    group_id='order-processor-group',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=token_provider,
    max_poll_records=100,
    session_timeout_ms=30000
)

for message in consumer:
    order = message.value
    print(f"Processing order: {order['order_id']} for ${order['amount']}")
    # リアルタイム処理(決済・配送)

IaC (Terraform): MSK Serverless

# MSK Serverless クラスター
resource "aws_msk_serverless_cluster" "example" {
  cluster_name = "serverless-kafka-cluster"
  
  vpc_config {
    subnet_ids         = ["subnet-12345678", "subnet-87654321"]
    security_group_ids = ["sg-0123456789abcdef0"]
  }
  
  client_authentication {
    sasl {
      iam = true
    }
  }
  
  tags = {
    Environment = "Production"
    Tier        = "Streaming"
  }
}

# CloudWatch アラーム
resource "aws_cloudwatch_metric_alarm" "serverless_throttle" {
  alarm_name          = "MSK-Serverless-Throttle"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "ThrottledProduceRequestCount"
  namespace           = "AWS/Kafka"
  period              = "300"
  statistic           = "Sum"
  threshold           = "10"
  alarm_actions       = ["arn:aws:sns:ap-northeast-1:123456789012:alerts"]
}

CloudFormation: MSK Provisioned + KRaft

Resources:
  MSKCluster:
    Type: AWS::MSK::Cluster
    Properties:
      BrokerNodeGroupInfo:
        InstanceType: kafka.m5.large
        ClientSubnets:
          - subnet-12345678
          - subnet-87654321
          - subnet-abcdef00
        SecurityGroups:
          - sg-0123456789abcdef0
        StorageInfo:
          EBSStorageInfo:
            VolumeSize: 500
            ProvisionedThroughput:
              Enabled: true
              VolumeThroughput: 250
      ClusterName: prod-msk-kraft
      KafkaVersion: "3.7.0"
      NumberOfBrokerNodes: 6
      EncryptionInfo:
        EncryptionInTransit:
          ClientBroker: TLS
          InCluster: true
        EncryptionAtRest:
          DataVolumeKMSKeyId: !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/12345678-1234-1234-1234-123456789012"
      ClientAuthentication:
        Sasl:
          Iam: true
          Scram: false
      EnhancedMonitoring: PER_TOPIC_PER_BROKER
      LoggingInfo:
        BrokerLogs:
          CloudWatchLogs:
            Enabled: true
            LogGroup: /aws/msk/production
          S3:
            Enabled: true
            Bucket: my-msk-logs
            Prefix: broker-logs/

  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/msk/production
      RetentionInDays: 30

Outputs:
  ClusterArn:
    Value: !GetAtt MSKCluster.Arn
  BootstrapBrokers:
    Value: !GetAtt MSKCluster.BootstrapBrokerString

類似サービス比較表

観点 Amazon MSK Apache Kafka
Self-managed
Confluent Cloud Redpanda Azure Event Hubs
ホスティング AWS マネージド オンプレ/EC2 SaaS Self-managed Azure マネージド
Kafka API ネイティブ ネイティブ ネイティブ Kafka 互換 AMQP / Kafka API
スケーリング 自動/手動 手動 自動 手動 自動
監視・運用 CloudWatch 自前 内蔵(Confluent Control Center) 内蔵 内蔵
Schema Registry AWS Glue 別途構築 内蔵 別途 別途
KRaft サポート GA(3.4+) GA(3.3+) GA ネイティブ N/A
コスト透明度 高(AWS 従量課金) 固定 ドル/パーティション-時間 固定 スループット単位
マルチリージョン MSK Replicator Kafka Mirror Maker 内蔵 別途構築 内蔵

ベストプラクティス

✅ 推奨

  • IAM 認証を使用 - SASL/SCRAM より安全・管理容易
  • レプリケーション 3 設定 - min.insync.replicas=2 + acks=all で無損失
  • マルチ AZ デプロイ - 各 AZ に 1 ブローカー以上配置(3AZ 推奨)
  • Storage Auto-scaling - EBS スケーリングを有効化(ディスク満杯回避)
  • パーティション設計 - パーティション数 = 消費並列度 × 3 を目安
  • CloudWatch & ログ監視 - ブローカーログを CloudWatch / S3 に転送
  • 定期バージョンアップ - Kafka セキュリティパッチを 3ヶ月ごと適用
  • KRaft モード採用 - Kafka 3.4+ では KRaft がデフォルト(ZK 廃止予定)
  • MSK Connect 活用 - S3/RDS へのシンク・ソース連携を自動化
  • Metrics 監視 - BytesInPerSec, BytesOutPerSec, FetchConsumerTotalTimeMs を監視

❌ アンチパターン

  • パーティションキー偏り - 特定キーが大量メッセージ生成(ホットシャード)
  • Default パーティション数 12 のまま - 消費者数が 20 個でスループット枯渇
  • ZooKeeper クラスターの不安定化 - ZK のディスク満杯・GC 暴走
  • 圧縮なしの大量メッセージ - ネットワーク帯域浪費
  • ブローカーログ無効化 - トラブル時のデバッグ困難
  • IAM ロール権限を全許可 - 最小権限ルール無視
  • Serverless で低メッセージレート - バースト対応向けなので常時高スループット不要

トラブルシューティング表

症状 原因 対応
Connection timeout: xxx:9098 ブローカーエンドポイント誤り / セキュリティグループ遮断 aws kafka get-bootstrap-brokers で確認、SG 確認
SASL authentication failed IAM ロール権限不足 / トークン期限切れ IAM ロールに kafka-cluster:* 権限追加、トークン再生成
Topic replica out-of-sync ブローカーディスク満杯 / メモリ不足 EBS Volume 拡張、ブローカー再起動、ホットシャード分割
Consumer lag 増加 コンシューマー遅延 / GC 暴走 コンシューマーの parallelization 向上、ヒープ増加
Broker down / Unhealthy ハードウェア障害 / ZK メタデータ破損 AWS がフェイルオーバー対応、KRaft モード移行検討
Throughput 上限に達した シャード容量枯渇 ブローカー数増加(SplitShard)、Express ブローカー検討
Message loss acks=0 設定 / min.insync.replicas=1 acks=‘all’、min.insync.replicas=2 に設定

2025-2026 最新動向

  • MSK Express GA(2026-04) - Standard の 5~10 倍スループット、金融・リアルタイム分析向け
  • KRaft デフォルト - Kafka 3.7 では KRaft が推奨、ZooKeeper 廃止予定
  • Enhanced Monitoring - より詳細なメトリクス(Topic/Broker 単位)
  • MSK Connect 拡張 - 新しいコネクタ増加(DynamoDB、Neptune など)
  • IAM 認証成熟化 - レガシー SASL/SCRAM の非推奨化傾向
  • On-Demand スケーリング - Serverless の動的キャパシティ調整改善
  • Lambda Event Source Mapping - ポーリング改善、より低レイテンシー

学習リソース・参考文献

公式ドキュメント

  1. AWS MSK Developer Guide
  2. MSK Supported Kafka Versions
  3. MSK FAQs
  4. MSK Pricing
  5. MSK Features
  6. How to Choose the Right MSK Cluster Type
  7. What is MSK Serverless
  8. MSK Security Best Practices

オープンソース・ベンダー資料

  1. Apache Kafka Official Documentation
  2. Confluent Platform Documentation
  3. Redpanda Documentation
  4. Aiven Kafka Documentation
  5. AWS MSK Terraform Provider

実装例・チェックリスト

実装例: Event Sourcing パターン

# MSK を Event Store として活用
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime

class EventStore:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            compression_type='snappy'
        )
        self.event_topic = 'order-events'
    
    def publish_event(self, aggregate_id, event_type, payload):
        event = {
            'event_id': str(uuid.uuid4()),
            'aggregate_id': aggregate_id,
            'event_type': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload,
            'version': 1
        }
        self.producer.send(
            self.event_topic,
            key=aggregate_id.encode('utf-8'),
            value=event
        )
        self.producer.flush()
        return event['event_id']

# 使用例
store = EventStore(['b-1.xxx:9098', 'b-2.xxx:9098'])
store.publish_event(
    'ORDER-12345',
    'OrderCreated',
    {'amount': 50000, 'customer': 'CUST-99'}
)

チェックリスト

  • [ ] ブローカーサイズ・数は消費スループット想定値の 3 倍?
  • [ ] 各 AZ に最低 1 ブローカー配置?
  • [ ] IAM 認証有効化?
  • [ ] TLS 暗号化(in-transit & at-rest)有効化?
  • [ ] レプリケーション 3、min.insync.replicas=2?
  • [ ] ブローカーログ(CloudWatch / S3)有効化?
  • [ ] CloudWatch アラーム設定(レプリカ遅延、ディスク、CPU)?
  • [ ] 定期バージョンアップ計画立案?
  • [ ] MSK Replicator で DR 構成検討?
  • [ ] Lambda Event Source Mapping テスト実施?

まとめ

Amazon MSK は Kafka クラスター運用を完全に AWS に委託し、プロデューサー・コンシューマー・トピック管理に専念できるマネージドサービスです。MSK Provisioned(Standard/Express)でキャパシティ制御、MSK Serverless で自動スケーリングを実現し、IAM 認証・TLS・KRaft・自動フェイルオーバー・Storage Auto-scaling により本番グレードのセキュリティ・可用性を備えています。既存 Kafka クラスターからのコード変更なし移行、マイクロサービス・ログ集約・CDC・リアルタイム分析の中核基盤として、Lambda / Flink / Glue との統合で AWS ストリーミングエコシステムの中心的役割を果たします。


最終更新:2026-04-27
バージョン:v2.0