目次
Amazon Kinesis Data Streams v2.0 完全ガイド 2026
概要
Amazon Kinesis Data Streams(以下 KDS)は、IoT・クリックストリーム・ログ・トランザクションなどの大量・高速なストリーミングデータをリアルタイムに取り込み、複数の独立したコンシューマーが同時並列に処理するための AWS 完全マネージドサービスです。シャード単位でスループットを制御し、最大 365 日(拡張保持オプション)のデータ保持期間によってリプレイを可能にします。Kinesis Client Library(KCL)・Lambda Event Source Mapping・Managed Service for Apache Flink など多様なコンシューマーが同じデータストリームを独立して消費できる点が、キュー型の SQS や配信専用の Firehose と異なる大きな特徴です。
課題解決
- 複数コンシューマーによる並列処理 - 同一データを リアルタイム分析(Flink)・S3 保存(Firehose)・Lambda アラート が同時に消費
- リプレイによる障害復旧 - バグ修正後に過去 30~365 日のデータを再処理
- ホットシャード対策 - パーティションキーのランダムサフィックスで均等分散
- On-Demand Advantage - 2026 年新:従量課金で 60% 低コスト化
- Enhanced Fan-Out - 50 コンシューマーまで各専用スループット確保(2026 拡張)
アーキテクチャ図
graph TB
subgraph Producers["プロデューサー層"]
IoT["IoT デバイス"]
WebClick["Web クリック"]
Logs["アプリログ"]
TxnDB["トランザクション DB"]
end
subgraph KDS["Kinesis Data Streams<br/>シャード構成"]
Shard1["Shard 0<br/>パーティション: 0-xxx"]
Shard2["Shard 1<br/>パーティション: xxx-yyy"]
ShardN["Shard N<br/>パーティション: zzz"]
end
subgraph ConsumerTypes["コンシューマー層"]
Lambda["Lambda<br/>Event Source"]
Firehose["Firehose<br/>→ S3/Redshift"]
Flink["Managed Flink<br/>複雑処理"]
KCL["KCL App<br/>カスタム処理"]
end
subgraph OutputTargets["出力先"]
DDB["DynamoDB<br/>集計結果"]
Cache["ElastiCache<br/>リアルタイム"]
S3["S3<br/>ファイル"]
DB["RDS<br/>永続化"]
end
Producers -->|PutRecord<br/>Partition Key| KDS
KDS -->|GetRecords<br/>Enhanced Fan-Out| ConsumerTypes
ConsumerTypes --> OutputTargets
style KDS fill:#FF9900,color:#fff
style Lambda fill:#FFA500
style Flink fill:#FFA500
コアコンポーネント
1. シャード(スループットの基本単位)
シャード仕様(1 シャード当たり):
入力: 1 MB/秒 × 1,000 レコード/秒
出力:
- GetRecords(共有): 2 MB/秒(全コンシューマー合計)
- Enhanced Fan-Out: 2 MB/秒/コンシューマー(最大 50)
例: 5 MB/秒の入力スループット
→ 5 シャード必要
パーティションキー vs シャード:
・JSON {"customer_id": "CUST-123", ...}
・PartitionKey = customer_id
・MD5(customer_id) の値に基づいてシャード割り当て
・同一パーティションキーは常に同じシャードへ → 順序保証
ホットシャード回避:
❌ PartitionKey = country(米国に偏る)
✅ PartitionKey = customer_id + "-" + random(0-99)
2. スケーリング戦略
Provisioned モード(従来型)
ブローカー数を事前指定:
- 初期: 4 シャード
- 需要増時: SplitShard で 8 シャードに拡張
- 維持: 月単位のコスト予測が可能
推奨: 安定したスループット・予測可能なコスト
On-Demand モード(2021 GA)
ブローカー数を自動調整:
- 直近 30 日間のピーク × 2 を自動上限設定
- 60% ディスカウント対象(Advantage)
- バースト対応・キャパシティ判断不要
推奨: スタートアップ・トラフィック不安定・開発環境
価格(2026 On-Demand Advantage):
- Data Ingest: $0.032/GB
- Data Retrieval (Shared): $0.016/GB
- Data Retrieval (Enhanced Fan-Out): $0.016/GB
Provisioned with On-Demand Advantage(新 2026)
- 柔軟なハイブリッド:
- 基本シャード数は固定
- 超過分を On-Demand で自動スケール
- コスト予測 + 柔軟性
3. データ保持とリプレイ
デフォルト: 24 時間
拡張保持: 7 日間(+$0.020/シャード-時間)
長期保持: 365 日間(+$0.023/GB-月)
リプレイシナリオ:
1. バグ修正: コンシューマーロジック修正後、TRIM_HORIZON で過去データ再処理
2. 新機能追加: 新 Lambda 関数を追加コンシューマーとして登録
3. 障害復旧: 障害で処理停止したコンシューマーを AT_TIMESTAMP で特定時点から再開
例:
# 2026-04-27 10:00:00 から再処理
esm = boto3.client('lambda')
esm.update_event_source_mapping(
UUID='event-source-mapping-uuid',
FunctionName='processor-function',
StartingPosition='AT_TIMESTAMP',
StartingPositionTimestamp='2026-04-27T10:00:00'
)
4. コンシューマータイプ
GetRecords(共有スループット)
全コンシューマーで 2 MB/秒/シャード を共有:
・ポーリング型(プル)
・レイテンシー: 200ms 程度
- 追加料金なし
例: Lambda Event Source Mapping
BatchSize: 100 レコード
MaximumBatchingWindowInSeconds: 5
ParallelizationFactor: 10(シャード×10 の並列 Lambda 実行)
シナリオ:
10 シャード × 10 並列 = 100 並列 Lambda 起動
各ウィンドウで最大 1,000 レコード処理
Enhanced Fan-Out(拡張ファンアウト)
各コンシューマーで 2 MB/秒/シャード を専有:
・プッシュ型(AWS が主動的にデータ転送)
・レイテンシー: 70ms(GetRecords の 1/3)
・追加料金: $0.015/コンシューマー-シャード-時間(2026 Advantage で削減)
・最大 50 コンシューマー
例: Managed Flink アプリ
RegisterStreamConsumer で 3 つの Flink ジョブを登録
各ジョブが独立した 2 MB/秒 を確保
合計スループット: 6 MB/秒(GetRecords の 3 倍)
5. Lambda Event Source Mapping
設定項目:
- StartingPosition: TRIM_HORIZON / LATEST / AT_TIMESTAMP
- BatchSize: 1~10,000 レコード
- BisectBatchOnFunctionError: true(エラー時バッチ分割)
- ParallelizationFactor: 1~10(シャードあたり並列度)
- DestinationConfig: 失敗レコード → SQS/SNS
- MaximumRetryAttempts: 2(デフォルト)
障害時処理:
Lambda がエラー → 自動リトライ 2 回
→ 失敗レコードを SQS に送信(設定時)
→ DLQ 処理で後続処理
主要ユースケース
- リアルタイムクリックストリーム分析 - EC サイト訪問 → KDS → Flink(クリックバンドル) → OpenSearch(ユーザー行動)
- IoT センサーデータ集約 - 100 万センサー → KDS → Lambda(異常検知)・Firehose(S3 保存)
- ログリアルタイム監視 - アプリログ → KDS → Lambda(エラー検知)・ELK(可視化)
- 金融トランザクション処理 - 決済 → KDS → リスク検知(Lambda)・監査ログ(S3)・UI リアルタイム更新(WebSocket)
- ゲームスコアボード - プレイヤースコア更新 → KDS → DynamoDB(リーダーボード) → WebSocket → リアルタイムランキング
- 複数 Lambda で並列処理 - データ → KDS → Lambda A(請求)・Lambda B(マーケティング)・Lambda C(在庫)
- リプレイによる新機能追加 - 既存コンシューマーと新コンシューマーが過去 30 日を並列処理
- ストリーム分析ダッシュボード - KDS → Managed Flink(集計) → CloudWatch Dashboards
- モバイルアプリテレメトリ - クラッシュレポート → KDS → Lambda(集約) → Datadog(分析)
- リアルタイム価格設定 - 競合分析 → KDS → Lambda(価格更新) → DynamoDB → 公開 API
- マルチリージョンレプリケーション - リージョン A の KDS → リージョン B の KDS(Firehose で複製)
- 複合イベント処理 - KDS → Flink SQL(複数ストリーム JOIN) → アラート送信
- リアルタイム請求アグリゲーション - トランザクション → KDS → Flink(5 分集計) → CloudWatch Metrics
- セキュリティログ統合 - VPC Flow Logs・CloudTrail → KDS → SIEM(Splunk)
設定・操作の具体例
CLI: On-Demand Advantage でストリーム作成
# On-Demand Advantage モードでストリーム作成(自動スケール)
aws kinesis create-stream \
--stream-name ecommerce-clickstream \
--stream-mode-details StreamMode=ON_DEMAND \
--region ap-northeast-1
# ストリーム詳細確認
aws kinesis describe-stream \
--stream-name ecommerce-clickstream \
--region ap-northeast-1
# 出力:
# {
# "StreamDescription": {
# "StreamName": "ecommerce-clickstream",
# "StreamARN": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/ecommerce-clickstream",
# "StreamModeDetails": {
# "StreamMode": "ON_DEMAND"
# },
# "StreamStatus": "ACTIVE",
# "Shards": [...],
# "HasMoreShards": false,
# "RetentionPeriodHours": 24,
# "StreamCreationTimestamp": "2026-04-27T10:00:00Z"
# }
# }
# レコード送信
aws kinesis put-record \
--stream-name ecommerce-clickstream \
--data '{"user_id":"USER-123","page":"product","product_id":"PROD-456","timestamp":"2026-04-27T10:30:00Z"}' \
--partition-key USER-123 \
--region ap-northeast-1
# バッチ送信(高スループット推奨)
aws kinesis put-records \
--stream-name ecommerce-clickstream \
--records file://click-records.json \
--region ap-northeast-1
SDK (Python): プロデューサー・コンシューマー
import boto3
import json
import uuid
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
stream_name = 'ecommerce-clickstream'
# 1. プロデューサー: PutRecord
def send_single_click(user_id, page, product_id):
record = {
'user_id': user_id,
'click_id': str(uuid.uuid4()),
'page': page,
'product_id': product_id,
'timestamp': datetime.utcnow().isoformat()
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(record),
PartitionKey=user_id # 同一ユーザーは同一シャードへ
)
print(f"Shard: {response['ShardId']}, Seq: {response['SequenceNumber']}")
return response['ShardId']
# 2. プロデューサー: PutRecords(バッチ・高スループット)
def send_batch_clicks(click_records):
"""
click_records: [
{'user_id': 'USER-1', 'page': 'home', ...},
{'user_id': 'USER-2', 'page': 'product', ...},
...
]
"""
records = [
{
'Data': json.dumps(record),
'PartitionKey': record['user_id']
}
for record in click_records
]
response = kinesis.put_records(
StreamName=stream_name,
Records=records
)
print(f"Sent {response['RecordCount']} records")
if response['FailedRecordCount'] > 0:
print(f"Failed: {response['FailedRecordCount']}")
return response
# 3. コンシューマー: GetRecords(従来型)
def consume_stream_shared_throughput():
response = kinesis.describe_stream(StreamName=stream_name)
shards = response['StreamDescription']['Shards']
for shard in shards:
shard_iterator_response = kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard['ShardId'],
ShardIteratorType='LATEST' # または TRIM_HORIZON
)
shard_iterator = shard_iterator_response['ShardIterator']
while shard_iterator:
records_response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in records_response['Records']:
data = json.loads(record['Data'])
print(f"User {data['user_id']} clicked {data['page']}")
shard_iterator = records_response.get('NextShardIterator')
# 4. コンシューマー: Enhanced Fan-Out
def consume_stream_enhanced_fanout():
# StreamConsumer を登録
consumer_response = kinesis.register_stream_consumer(
StreamARN='arn:aws:kinesis:ap-northeast-1:123456789012:stream/ecommerce-clickstream',
ConsumerName='my-fanout-consumer'
)
consumer_arn = consumer_response['Consumer']['ConsumerARN']
# SubscribeToShard で購読開始
response = kinesis.describe_stream(StreamName=stream_name)
shards = response['StreamDescription']['Shards']
for shard in shards:
subscribe_response = kinesis.subscribe_to_shard(
ConsumerARN=consumer_arn,
ShardId=shard['ShardId'],
StartingPosition={'Type': 'LATEST'}
)
# EventStream をポーリング
event_stream = subscribe_response['EventStream']
for event in event_stream:
if 'Records' in event:
for record in event['Records']:
data = json.loads(record['Data'])
print(f"Enhanced: {data['user_id']}")
# 5. Lambda Event Source Mapping(推奨)
def setup_lambda_event_source():
lambda_client = boto3.client('lambda', region_name='ap-northeast-1')
response = lambda_client.create_event_source_mapping(
EventSourceArn='arn:aws:kinesis:ap-northeast-1:123456789012:stream/ecommerce-clickstream',
FunctionName='clickstream-processor',
Enabled=True,
StartingPosition='LATEST',
BatchSize=100,
MaximumBatchingWindowInSeconds=5,
ParallelizationFactor=10, # シャード×10 の並列 Lambda
BisectBatchOnFunctionError=True,
MaximumRetryAttempts=2,
DestinationConfig={
'OnFailure': {
'Type': 'SQS',
'Destination': 'arn:aws:sqs:ap-northeast-1:123456789012:dlq-queue'
}
}
)
return response['UUID']
# 使用例
if __name__ == '__main__':
# クリック送信
for i in range(10):
send_single_click(f'USER-{i % 100}', 'product', f'PROD-{i}')
# Lambda ESM 設定
esm_uuid = setup_lambda_event_source()
print(f"Event Source Mapping: {esm_uuid}")
IaC (Terraform): On-Demand + Enhanced Fan-Out
# On-Demand ストリーム
resource "aws_kinesis_stream" "clickstream" {
name = "ecommerce-clickstream"
stream_mode_details {
stream_mode = "ON_DEMAND"
}
retention_period = 24 # 24 時間(デフォルト)
tags = {
Environment = "Production"
Tier = "Streaming"
}
}
# Lambda Event Source Mapping
resource "aws_lambda_event_source_mapping" "clickstream_processor" {
event_source_arn = aws_kinesis_stream.clickstream.arn
function_name = aws_lambda_function.processor.arn
starting_position = "LATEST"
batch_size = 100
maximum_batching_window_in_seconds = 5
parallelization_factor = 10
function_response_types = ["ReportBatchItemFailures"]
bisect_batch_on_function_error = true
maximum_retry_attempts = 2
destination_config {
on_failure {
type = "SQS"
destination_arn = aws_sqs_queue.dlq.arn
}
}
}
# CloudWatch アラーム: GetRecords レイテンシー
resource "aws_cloudwatch_metric_alarm" "get_records_latency" {
alarm_name = "KDS-GetRecords-High-Latency"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "2"
metric_name = "GetRecords.IteratorAgeMilliseconds"
namespace = "AWS/Kinesis"
period = "60"
statistic = "Maximum"
threshold = "10000" # 10 秒以上は異常
dimensions = {
StreamName = aws_kinesis_stream.clickstream.name
}
alarm_actions = ["arn:aws:sns:ap-northeast-1:123456789012:alerts"]
}
output "stream_arn" {
value = aws_kinesis_stream.clickstream.arn
}
CloudFormation: データ保持拡張
Resources:
ClickstreamStream:
Type: AWS::Kinesis::Stream
Properties:
Name: ecommerce-clickstream
StreamModeDetails:
StreamMode: ON_DEMAND
RetentionPeriodHours: 24
Tags:
- Key: Environment
Value: Production
ExtendedRetention:
Type: AWS::Kinesis::StreamConsumer
Properties:
StreamARN: !GetAtt ClickstreamStream.Arn
ConsumerName: analytics-consumer
# Lambda Event Source Mapping
ESM:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !GetAtt ClickstreamStream.Arn
FunctionName: ClickstreamProcessor
StartingPosition: LATEST
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
ParallelizationFactor: 10
BisectBatchOnFunctionError: true
MaximumRetryAttempts: 2
FunctionResponseTypes:
- ReportBatchItemFailures
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt DLQQueue.Arn
Outputs:
StreamArn:
Value: !GetAtt ClickstreamStream.Arn
類似サービス比較表
| 観点 | Kinesis Data Streams | SQS | MSK (Kafka) | Apache Pulsar | Azure Event Hubs |
|---|---|---|---|---|---|
| スループット | シャード数で制御 | ほぼ無制限 | ほぼ無制限 | ほぼ無制限 | スループット単位 |
| 保持期間 | 最大 365 日 | 最大 14 日 | 設定可能 | 設定可能 | 最大 7 日 |
| 複数コンシューマー | 並列に独立消費 | 競争(1 メッセージ → 1 Worker) | コンシューマーグループ | 並列に独立消費 | コンシューマーグループ |
| 順序保証 | シャード内 | FIFO キューのみ | パーティション内 | パーティション内 | パーティション内 |
| リプレイ | ✅(保持期間内) | ❌ | ✅ | ✅ | ❌ |
| スケーリング | 手動/On-Demand | 自動 | 手動 | 手動 | 自動 |
| レイテンシー | 200ms(GetRecords) / 70ms(Enhanced) | ~100ms | ~100ms | ~100ms | ~100ms |
| コスト形態 | シャード-時間 + GB | リクエスト数 | ブローカー-時間 | 固定 | スループット-時間 |
| 使用例 | リアルタイム分析・リプレイ | キューイング・非同期処理 | Kafka エコシステム | 高スループット・Geo | Azure 統合 |
ベストプラクティス
✅ 推奨
- パーティションキーの均等分散 -
customer_id + "-" + random(0-99)でホットシャード回避 - On-Demand Advantage 活用 - 2026 年推奨、自動スケール + 60% 低コスト
- Enhanced Fan-Out 複数コンシューマー - 各専用 2MB/秒確保、低レイテンシー
- Lambda Event Source Mapping - GetRecords より管理簡単・並列度高い
- データ保持は用途に応じて - リプレイ不要なら 24h、必要なら 30~365 日
- CloudWatch メトリクス監視 - GetRecords.IteratorAgeMilliseconds、数時間に 1 回確認
- エラーハンドリング DLQ - 失敗レコードを SQS に送信、後処理で再試行
- バッチサイズ 100~1,000 - Lambda 実行コスト最適化
- ParallelizationFactor 10 - シャード×10 の並列 Lambda 起動
❌ アンチパターン
- パーティションキー偏り -
countryだけでは米国に集中 - GetRecords で 50+ コンシューマー - 共有 2MB/秒で枯渇
- 24 時間保持で過去 7 日リプレイ必要 - 保持期間設定ミス
- SQS を使うべき場面で KDS - メッセージが少量・バースト少ない場合は SQS
- CloudWatch メトリクス無視 - レイテンシー増加気づかず
- DestinationConfig(DLQ)なし - 失敗レコード追跡不可
- Provisioned で固定シャード - トラフィック変動に対応困難
トラブルシューティング表
| 症状 | 原因 | 対応 |
|---|---|---|
ProvisionedThroughputExceededException |
シャード容量枯渇 | SplitShard で シャード数増加 / On-Demand Advantage に切り替え |
Lambda Throttle |
Event Source Mapping の ParallelizationFactor 不足 | ParallelizationFactor を 10 に増加 |
GetRecords.IteratorAgeMilliseconds > 10s |
コンシューマー遅延 | Lambda 処理時間短縮 / ParallelizationFactor 増加 |
SequenceNumber Gap |
レコード損失ではなく、リトライ後の重複 | 冪等性を実装(DynamoDB で ExpressionAttributeNames) |
ShardIterator Expired |
15 分以上レコード取得なし | GetRecords ポーリング間隔短縮 |
Records Count = 0 |
データが流れていない | プロデューサー確認・PutRecord 成功確認 |
| Enhanced Fan-Out 登録エラー | 既存ストリームコンシューマー限界(50) | 不要なコンシューマー削除・StreamConsumer ARN 確認 |
2025-2026 最新動向
- On-Demand Advantage GA(2026 Q1) - 60% 低コスト、デフォルト推奨オプション
- Enhanced Fan-Out 50 コンシューマー対応(2026-04) - より多くの並列アプリケーション対応
- Lambda ESM パフォーマンス向上 - ポーリング改善、レイテンシー削減
- CloudWatch Application Signals - Kinesis メトリクスの自動トレース
- Multi-Region フェイルオーバー強化 - クロスリージョンレプリケーション改善
- IAM 認証統合 - より細粒度の権限制御
学習リソース・参考文献
公式ドキュメント
- AWS Kinesis Data Streams Developer Guide
- Kinesis Data Streams Pricing
- Kinesis Data Streams FAQs
- On-Demand Advantage
- Enhanced Fan-Out 50 Consumers
- Kinesis Client Library
- Lambda Event Source Mapping
- Kinesis Best Practices
関連サービス・ベンダー
- Apache Kafka Documentation
- Apache Pulsar
- Managed Service for Apache Flink
- Kinesis Data Firehose
- AWS Lambda
実装例・チェックリスト
実装例: リアルタイムレコメンデーション
# KDS プロデューサー: クリックイベント送信
from kinesis_producer import KinesisProducer
producer = KinesisProducer(stream_name='clickstream')
def track_click(user_id, product_id):
event = {
'user_id': user_id,
'product_id': product_id,
'timestamp': datetime.utcnow().isoformat()
}
producer.put_record(
data=json.dumps(event),
partition_key=user_id
)
# Lambda コンシューマー: リアルタイム推奨ロジック
def lambda_handler(event, context):
for record in event['Records']:
payload = json.loads(record['kinesis']['data'])
user_id = payload['user_id']
product_id = payload['product_id']
# DynamoDB に最新クリック履歴を更新
dynamodb.Table('user_preferences').update_item(
Key={'user_id': user_id},
UpdateExpression='SET last_clicked = :p',
ExpressionAttributeValues={':p': product_id}
)
# キャッシュを更新(ElastiCache)
redis.lpush(f'user:{user_id}:clicks', product_id)
# リアルタイムランキング更新
dynamodb.Table('product_ranking').update_item(
Key={'product_id': product_id},
UpdateExpression='ADD click_count :inc',
ExpressionAttributeValues={':inc': 1}
)
return {'statusCode': 200}
チェックリスト
- [ ] On-Demand Advantage モード有効化?
- [ ] パーティションキーの均等分散設計?
- [ ] 保持期間を用途に応じて設定?(24h / 30日 / 365日)
- [ ] Lambda Event Source Mapping で ParallelizationFactor=10 設定?
- [ ] DLQ(SQS)で失敗レコード処理?
- [ ] CloudWatch メトリクス監視(GetRecords.IteratorAge)?
- [ ] CloudWatch アラーム設定(IteratorAge > 10s)?
- [ ] Enhanced Fan-Out テスト実施(複数コンシューマー)?
- [ ] 定期的なスケーリング容量計画?
- [ ] Flink / Firehose 連携テスト?
まとめ
Amazon Kinesis Data Streams は リアルタイム大規模ストリーミングデータをシャード単位で制御し、複数の独立したコンシューマーが並行して消費・処理するための AWS 完全マネージド基盤です。On-Demand Advantage による自動スケーリング・60% 低コスト化、Enhanced Fan-Out による低レイテンシー・複数アプリ対応、最大 365 日のリプレイ機能により、クリックストリーム・IoT・ログ・トランザクションなどの大量高速データをリアルタイムに分析・処理する中核システムとして機能します。Lambda・Flink・Firehose との統合で AWS ストリーミングエコシステムを完成させます。
最終更新:2026-04-27
バージョン:v2.0