目次
Amazon Managed Service for Apache Flink 完全ガイド 2026
ストリーミング分析の本質とスケーラブルなリアルタイム処理基盤
Amazon Managed Service for Apache Flink(旧 Kinesis Data Analytics for Apache Flink)は、Apache Flink アプリケーションをサーバーレスで実行する完全マネージドストリーミング分析サービス です。Kinesis Data Streams・MSK(Kafka)からのリアルタイムデータをミリ秒レイテンシーで処理し、ステートフルな複雑なストリーミング処理・窓集計・複合イベント処理(CEP)をスケーラブルに実現します。本ドキュメントは、Managed Flink の本質・アーキテクチャ・実装・比較・ベストプラクティスを体系的に解説する包括的ガイドです。
ドキュメントの目的
本ガイドは以下を対象としています。
- 初心者向け: Apache Flink とは何か、なぜマネージドサービスが必要かを学びたい方
- データエンジニア向け: Java/Scala/Python による Flink アプリケーション開発・デプロイ
- ストリーミング処理実務者向け: Stateful Processing・Checkpointing・Window 処理・カスタムロジック実装
- アーキテクト向け: Lambda・Spark Streaming・Kafka Streams との比較・選定判断
- 運用者向け: Scaling・Monitoring・Troubleshooting・Cost Optimization
2026 年の Managed Flink エコシステム
- Flink 1.18/1.19 対応: Table API/SQL の大幅な最適化、PyFlink による Python サポート強化
- Studio Notebook の進化: Zeppelin ベースの対話的分析、Flink SQL の interactive query
- Connector の拡充: MSK・Kinesis・S3・Firehose・Iceberg のネイティブサポート
- Stateful Computing の深化: RocksDB ステートバックエンド、Incremental Checkpointing
- CloudWatch 統合拡張: Application Signals による APM、カスタムメトリクス自動取得
- Iceberg/Hudi 統合: ACID Transaction、Time Travel、Data Lake との統合
- ML Pipeline 連携: SageMaker とのネイティブ統合(2026 年 H1)
概要
初心者向けメモ: Managed Flink は「ストリーミングデータをリアルタイムに複雑に処理し、状態を維持しながら結果を出力するサービス」です。単なるデータフィルタリングではなく、5 分間の売上合計・ユーザーセッション追跡・不正パターン検知など、複数データポイントを組み合わせた分析をミリ秒単位で実現します。Lambda ではシステムの複雑さから実現が難しく、セルフマネージド Flink クラスタでは運用負荷が大きいため、Managed Service が最適です。
Managed Service for Apache Flink は AWS が提供する Apache Flink の完全マネージド実行環境 です。インフラ管理・スケーリング・Checkpointing・Failover は AWS が全て管理し、開発者は Flink アプリケーションロジックに専念できます。
Managed Flink の位置づけ
graph TD
subgraph DataSources["データソース"]
KDS["Kinesis Data Streams"]
MSK["MSK Kafka"]
S3["Amazon S3"]
RDS["RDS / Aurora"]
end
subgraph ProcessingLayer["ストリーミング処理層"]
Lambda["Lambda\nステートレス軽量処理"]
Flink["Managed Flink\nステートフル複雑処理"]
SparkStreaming["Spark Streaming\nマイクロバッチ処理"]
end
subgraph OutputTargets["出力先"]
S3Out["S3 / Data Lake"]
Redshift["Redshift / OpenSearch"]
KDS_Out["Kinesis / Kafka"]
Dashboard["リアルタイム Dashboard"]
end
DataSources --> ProcessingLayer
ProcessingLayer --> OutputTargets
style Flink fill:#FF9900
style KDS fill:#FF9900
定義
AWS 公式による定義:
“Amazon Managed Service for Apache Flink allows you to build and run real-time, fault-tolerant streaming applications, using open-source Apache Flink frameworks with multiple language support (Java, Scala, and Python).”
Apache Flink をベースとしたフルマネージドサービスで、分散ストリーム処理エンジンの複雑な運用を AWS が担当します。
目次
- 概要
- Managed Flink が解決する課題
- 主な特徴
- アーキテクチャ
- コアコンポーネント
- Flink Application vs Studio
- API レベル
- 主要ユースケース
- 設定・操作の具体例
- Flink SQL
- ウィンドウ処理
- ステートフル処理
- Checkpointing と障害復旧
- Connector エコシステム
- 類似サービス比較表
- ベストプラクティス
- トラブルシューティング
- パフォーマンス最適化
- コスト管理
- セキュリティ
- 2025-2026 最新動向
- 学習リソース
- 実装例・チェックリスト
- まとめ
- 参考文献
Managed Flink が解決する課題
課題1: ステートフルなストリーミング処理の複雑性
従来の課題: Lambda は本質的にステートレスであり、過去 5 分間の売上合計・ユーザーセッション累積・移動平均など、状態を維持する処理が極めて難しい。外部データベースに状態を保存するアプローチは遅延増加・コスト増加につながる。
Managed Flink での解決: RocksDB ステートバックエンドで大規模なステート管理をネイティブサポート。状態はローカルディスクに保存され、ミリ秒単位でアクセス可能。Checkpointing で障害時の状態復旧を自動化。
課題2: インフラ管理の複雑性
従来の課題: セルフマネージド Flink クラスタ(EC2/EKS)は、クラスタスケーリング・ネットワーク設定・Checkpointing ストレージ・障害対応の自動化が必要で運用負荷が極めて大きい。
Managed Flink での解決: AWS が全インフラ管理を担当。KPU(Kinesis Processing Unit)ベースの自動スケーリング・高可用性・バックアップ・バージョン管理を提供。開発者はビジネスロジックに集中可能。
課題3: エグザクトリーワンス(Exactly-Once)保証の実現難度
従来の課題: 金融取引・請求計算など「1 回だけ正確に処理する」という要件は、分散システムでは極めて複雑。Lambda + DynamoDB では idempotency を自分で実装する必要があり、エラーハンドリングが複雑になる。
Managed Flink での解決: Flink がネイティブに Exactly-Once セマンティクスをサポート。Distributed Snapshots(チェックポイント)とトランザクション的なシンク処理により、自動的に重複処理を排除。
主な特徴
┌─────────────────────────────────────────────────────────────┐
│ Managed Service for Apache Flink の特徴 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ✅ サーバーレス実行 │
│ • インフラ管理不要、KPU 単位でスケーリング │
│ • Auto scaling・High availability 標準搭載 │
│ │
│ ✅ 複数言語サポート │
│ • Java / Scala / Python(PyFlink) │
│ • Flink SQL による SQL ベース処理 │
│ │
│ ✅ ネイティブな状態管理 │
│ • RocksDB ステートバックエンド │
│ • 大容量状態を効率的に管理(テラバイト級) │
│ │
│ ✅ Exactly-Once 処理保証 │
│ • Distributed Snapshots Checkpointing │
│ • 重複排除の自動化 │
│ │
│ ✅ 柔軟なウィンドウ処理 │
│ • Tumbling / Sliding / Session Windows │
│ • カスタムウィンドウ・ウォーターマーク │
│ │
│ ✅ リッチな Connector エコシステム │
│ • Kinesis / Kafka / S3 / Redshift / Firehose │
│ • カスタムソース・シンク実装可能 │
│ │
│ ✅ Studio Notebook(Interactive 分析) │
│ • Zeppelin ベースの対話的 Flink 開発 │
│ • SQL・Python・Scala を同一 Notebook で実行 │
│ │
│ ✅ CloudWatch 統合監視 │
│ • Application Signals・カスタムメトリクス │
│ • フルな可観測性を標準搭載 │
│ │
│ ✅ 完全な互換性 │
│ • オープンソース Apache Flink との完全互換 │
│ • セルフマネージド Flink との知識の再利用 │
│ │
└─────────────────────────────────────────────────────────────┘
アーキテクチャ
graph LR
subgraph Sources["ソース"]
KDS["Kinesis Streams"]
MSK["MSK Kafka"]
S3_in["S3 / DynamoDB"]
end
subgraph FlinkApp["Flink Application<br/>(マネージド実行)"]
JAR["JAR/Python コード"]
Runtime["Flink Runtime<br/>(TaskManager x N)"]
State["State Backend<br/>RocksDB"]
CP["Checkpointing"]
end
subgraph Processing["処理層"]
DataStream["DataStream API"]
TableSQL["Table API / SQL"]
CEP["Complex Event<br/>Processing"]
end
subgraph Outputs["出力先"]
S3_out["S3 / Data Lake"]
Redshift["Redshift / OpenSearch"]
KDS_out["Kinesis / Kafka"]
Lambda_sink["Lambda / Lambda"]
Dashboard["Real-time Dashboard"]
end
Sources --> FlinkApp
FlinkApp --> Processing
Processing --> Outputs
State --> CP
CP -->|Snapshots| S3_ckpt["S3 Checkpoint Storage"]
style FlinkApp fill:#FF9900
Managed Flink の構成要素
| コンポーネント | 役割 | 管理方式 |
|---|---|---|
| Application JAR/Python | ビジネスロジック定義 | ユーザー管理・S3 保存 |
| Flink Runtime | 分散処理エンジン | AWS マネージド |
| Task Manager | 並列処理実行 | KPU ベースの自動スケーリング |
| State Backend | 状態保存(RocksDB) | AWS マネージド・S3 バックアップ |
| Checkpointing | 障害復旧・スナップショット | 自動実行・間隔設定可能 |
| Job Manager | Orchestration・Coordination | AWS マネージド・HA 対応 |
コアコンポーネント
1. DataStream API(低レベル制御)
ユーザーが Flink の最も細かい制御を行う API。Java/Scala/Python で実装可能。
// Java DataStream API 例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> events = env
.addSource(new KinesisSource())
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new EventAggregator())
.addSink(new S3Sink());
env.execute("MyFlinkApplication");
用途: 細粒度の状態管理・カスタムウィンドウ・非同期処理(Async I/O)が必要な場合。
2. Table API / Flink SQL(高レベル抽象化)
DataStream よりも高い抽象レベルで、SQL/DataFrame ライクなインターフェース。最適化が自動化される。
-- Flink SQL 例
CREATE TABLE kafka_events (
event_id STRING,
user_id BIGINT,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'format' = 'json'
);
CREATE TABLE results AS
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
user_id,
SUM(amount) AS total_amount,
COUNT(*) AS event_count
FROM kafka_events
GROUP BY
TUMBLE(event_time, INTERVAL '1' MINUTE),
user_id;
用途: SQL エキスパートによる開発・初期の調査・シンプルな集計処理。
3. Process Function(カスタムロジック)
DataStream API と Flink State Management を融合させた中間レベル API。Timer・低レイテンシーの非同期処理に最適。
public class StatefulEventProcessor extends KeyedProcessFunction<String, Event, Result> {
private transient ValueState<Long> totalAmount;
private transient MapState<String, Integer> eventCounts;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Long> totalDescriptor =
new ValueStateDescriptor<>("total", Long.class);
totalAmount = getRuntimeContext().getState(totalDescriptor);
MapStateDescriptor<String, Integer> countsDescriptor =
new MapStateDescriptor<>("counts", String.class, Integer.class);
eventCounts = getRuntimeContext().getMapState(countsDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<Result> out) {
Long current = totalAmount.value();
totalAmount.update((current == null ? 0 : current) + event.getAmount());
// Timer で定期的に結果を出力
ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
Long total = totalAmount.value();
out.collect(new Result(ctx.getCurrentKey(), total));
}
}
用途: 複雑な状態管理・タイマー駆動処理・非同期外部 API 呼び出し。
4. CEP(Complex Event Processing)
複数イベントのパターン検出。例:「10 秒以内に同一ユーザーから 5 回以上の不正ログイン試行」を検知。
Pattern<Event, ?> pattern = Pattern
.<Event>begin("login_failures")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("LOGIN_FAILED");
}
})
.times(5)
.within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(
eventStream.keyBy(Event::getUserId),
pattern
);
patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
List<Event> loginFailures = pattern.get("login_failures");
return new Alert(
loginFailures.get(0).getUserId(),
"Suspicious login pattern detected",
loginFailures.size()
);
}
}).addSink(alertSink);
用途: 不正検知・異常検知・イベントコンボの自動検出。
Flink Application vs Studio
| 観点 | Application | Studio |
|---|---|---|
| 使用形態 | IDE で開発・CI/CD デプロイ | ブラウザ・対話的開発 |
| 言語 | Java / Scala / Python(PyFlink) | SQL / Python / Scala |
| 開発スタイル | プログラミング型・SDLC対応 | 探索型・即座な検証 |
| 本番対応 | ✅ 完全対応 | △(Studio から Deployed App に昇格可能) |
| Notebook 環境 | ❌ | ✅(Zeppelin) |
| 長期実行ジョブ | ✅ | △(確認後 Application に変換推奨) |
| チーム開発 | ✅(Git 連携) | △(Shared Workspace) |
| パフォーマンス | 高(インライン最適化) | 中(対話的分析向け) |
選択ガイド:
- エンタープライズ環境・本番ジョブ → Application(Git・CI/CD 連携)
- 初期分析・SQL 検索・アドホック → Studio(即座の探索)
- Notebook 型開発・データサイエンティスト → Studio Notebook
API レベル
┌──────────────────────────────────────────────────────────┐
│ Flink API 抽象レベル図 │
├──────────────────────────────────────────────────────────┤
│ │
│ 高度な抽象 ┌─ Flink SQL [最も簡潔] │
│ 化 ├─ Table API / DataFrame │
│ ├─ DataStream API / Operators │
│ ├─ Process Function / State │
│ └─ Low-Level Source/Sink API [最も自由度] │
│ 低レベル │
│ │
│ ✅ SQL レベル:開発時間短・最適化自動化 │
│ ✅ DataStream:細粒度制御・複雑処理対応 │
│ ✅ Process Fn:Timer・State・非同期処理 │
│ │
└──────────────────────────────────────────────────────────┘
主要ユースケース
1. リアルタイム不正検知(金融・EC)
- 取引ストリームからリアルタイムに不正パターンを検知・ブロック
- 過去 10 分間のユーザー取引額、取引頻度、地理情報から統計的異常検知
- Flink 適用点: ステートフル集計・CEP パターン検知・ミリ秒遅延
2. IoT センサーデータリアルタイム集計
- 数百万個のセンサーから秒単位でデータ到着、リアルタイムダッシュボード生成
- 5 分ごとの機械ごと・工場ごとの集計値・異常値通知
- Flink 適用点: 高スループット処理・ウィンドウ集計・複数キーでの並列処理
3. Kafka ストリームのリアルタイム ETL
- MSK(Kafka)トピックを Flink で変換・エンリッチして別トピックに配信
- ユーザーイベントに外部 API で顧客情報エンリッチ→ダウンストリーム消費
- Flink 適用点: 高速変換・状態ベース JOIN・外部システム連携
4. ユーザーセッション分析
- クリックストリームをセッションウィンドウで集計、セッション内行動を分析
- セッション開始後 30 分で商品閲覧→購入の傾向を集計
- Flink 適用点: Session Windows・時間ベースのイベント グループ化
5. リアルタイムレコメンデーション
- ユーザーの最新の行動(過去 5 分)を基に推奨商品リスト更新
- Flink で集計→Lambda で推奨エンジン実行→UI へのプッシュ
- Flink 適用点: 低遅延状態管理・リアルタイム機能生成
6. アプリケーションログの異常検知
- CloudWatch Logs からログストリームを Flink で分析、エラー率が 5% 超えで通知
- アプリ群別・地域別の Error Rate を並列集計
- Flink 適用点: 複数キーでの並列ストリーム処理・時系列統計
7. リアルタイム KPI ダッシュボード
- SaaS プラットフォームの顧客ごとの全リアルタイム指標を秒単位で更新
- 多テナント環境での顧客ごと・機能ごとの使用量・収益のリアルタイム集計
- Flink 適用点: 高並列度処理・多キー集計・大規模状態管理
8. ストリーム間の Join 処理
- ユーザーイベント(Action) と プロダクト カタログ(Product)の Join
- Flink StreamStream Join・Temporal Join で最新カタログとの突き合わせ
- Flink 適用点: 複数ストリーム結合・状態ベース検索
9. 決済後のバッチ集計
- リアルタイム決済ストリームから 1 分単位の売上・トランザクション数を集計
- 日次バッチの前提条件となるリアルタイム検証値を生成
- Flink 適用点: Tumbling Window・時系列集計の自動化
10. ネットワークメトリクス分析
- VPC Flow Logs・ALB ログをストリーム処理、ネットワーク異常検知
- 源 IP・宛先 IP・ポート別トラフィック量の急激な変化を検知
- Flink 適用点: 高スループット・複雑なフィルタリング・統計計算
設定・操作の具体例
AWS CLI によるアプリケーション作成
# JAR ファイルを S3 にアップロード
aws s3 cp target/my-flink-app-1.0.jar s3://my-flink-bucket/
# Managed Flink アプリケーション作成
aws kinesisanalyticsv2 create-application \
--application-name my-streaming-app \
--runtime-environment FLINK_1_19 \
--service-execution-role arn:aws:iam::ACCOUNT:role/flink-role \
--application-configuration \
Inputs='[
{
"NamePrefix": "SOURCE_SQL_STREAM",
"InputParallelism": 4,
"InputStartingPositionConfiguration": {
"InputStartingPosition": "LATEST"
},
"KinesisStreamsInput": {
"ResourceARN": "arn:aws:kinesis:region:account:stream/my-input-stream"
}
}
]'
Application Configuration(JSON)
{
"ApplicationName": "real-time-analytics",
"RuntimeEnvironment": "FLINK_1_19",
"ServiceExecutionRoleARN": "arn:aws:iam::ACCOUNT:role/flink-role",
"ApplicationConfiguration": {
"FlinkApplicationConfiguration": {
"CheckpointConfiguration": {
"ConfigurationType": "CUSTOM",
"CheckpointingEnabled": true,
"CheckpointInterval": 60000,
"MinPauseBetweenCheckpoints": 10000,
"CheckpointMode": "EXACTLY_ONCE"
},
"ParallelismConfiguration": {
"ConfigurationType": "CUSTOM",
"Parallelism": 4,
"ParallelismPerKPU": 1,
"AutoScalingEnabled": true
},
"MonitoringConfiguration": {
"ConfigurationType": "CUSTOM",
"MetricsLevel": "APPLICATION",
"LogLevel": "INFO"
}
}
}
}
CloudFormation による Infrastructure as Code
Resources:
ManagedFlinkApplication:
Type: AWS::KinesisAnalyticsV2::Application
Properties:
ApplicationName: my-analytics-app
RuntimeEnvironment: FLINK_1_19
ServiceExecutionRole: !GetAtt FlinkServiceRole.Arn
ApplicationConfiguration:
FlinkApplicationConfiguration:
CheckpointConfiguration:
CheckpointingEnabled: true
CheckpointInterval: 60000
MinPauseBetweenCheckpoints: 10000
CheckpointMode: EXACTLY_ONCE
ParallelismConfiguration:
Parallelism: 4
AutoScalingEnabled: true
MonitoringConfiguration:
MetricsLevel: APPLICATION
LogLevel: INFO
ApplicationCodeConfiguration:
S3ContentLocation:
BucketARN: !Sub 'arn:aws:s3:::${FlinkBucket}'
FileKey: my-app-1.0.jar
ObjectVersion: !Ref JarObjectVersion
FlinkServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: kinesisanalytics.amazonaws.com
Action: 'sts:AssumeRole'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
Policies:
- PolicyName: FlinkStreamAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'kinesis:*'
- 's3:*'
Resource: '*'
SDK(Python Boto3)による操作
import boto3
client = boto3.client('kinesisanalyticsv2', region_name='us-east-1')
# アプリケーション一覧取得
response = client.list_applications()
for app in response['ApplicationSummaries']:
print(f"App: {app['ApplicationName']}, Status: {app['ApplicationStatus']}")
# アプリケーション詳細取得
app_details = client.describe_application(ApplicationName='my-flink-app')
print(f"ARN: {app_details['ApplicationDetail']['ApplicationARN']}")
print(f"Status: {app_details['ApplicationDetail']['ApplicationStatus']}")
print(f"Create Timestamp: {app_details['ApplicationDetail']['CreateTimestamp']}")
# アプリケーション開始
start_response = client.start_application(
ApplicationName='my-flink-app',
RunConfiguration={
'FlinkRunConfiguration': {
'AllowNonRestoredState': False
}
}
)
# アプリケーション停止
stop_response = client.stop_application(ApplicationName='my-flink-app')
Terraform IaC
resource "aws_kinesisanalyticsv2_application" "example" {
name = "my-flink-application"
runtime_environment = "FLINK_1_19"
service_execution_role_arn = aws_iam_role.flink_role.arn
application_configuration {
flink_application_configuration {
checkpoint_configuration {
checkpointing_enabled = true
checkpoint_interval = 60000
min_pause_between_checkpoints = 10000
checkpoint_mode = "EXACTLY_ONCE"
}
parallelism_configuration {
parallelism = 4
auto_scaling_enabled = true
parallelism_per_kpu = 1
}
monitoring_configuration {
log_level = "INFO"
metrics_level = "APPLICATION"
}
}
}
application_code_configuration {
s3_content_location {
bucket_arn = aws_s3_bucket.flink_code.arn
file_key = "my-app-1.0.jar"
object_version = aws_s3_object.jar_file.version_id
}
}
tags = {
Environment = "production"
Team = "data-engineering"
}
}
resource "aws_iam_role" "flink_role" {
name = "flink-service-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "kinesisanalytics.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "flink_policy" {
name = "flink-execution-policy"
role = aws_iam_role.flink_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"kinesis:*",
"s3:*",
"logs:*"
]
Resource = "*"
}
]
})
}
Flink SQL
ストリームテーブル定義
-- Kafka トピックからストリーム定義
CREATE TABLE orders (
order_id STRING,
user_id BIGINT,
product_id BIGINT,
amount DOUBLE,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
-- Kinesis Streams からストリーム定義
CREATE TABLE user_events (
event_id STRING,
user_id BIGINT,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'user-events',
'aws.region' = 'us-east-1',
'format' = 'json'
);
-- Redshift への出力シンク
CREATE TABLE redshift_results (
event_time TIMESTAMP(3),
user_id BIGINT,
total_amount DOUBLE,
order_count BIGINT
) WITH (
'connector' = 'redshift',
'url' = 'jdbc:redshift://my-cluster.us-east-1.redshift.amazonaws.com:5439/mydb',
'table-name' = 'order_summaries',
'username' = 'admin',
'password' = '...',
'batch.size' = '1000',
'flush.interval' = '60000'
);
ウィンドウ集計
-- Tumbling Window(5 分ごと)
INSERT INTO redshift_results
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS event_time,
user_id,
SUM(amount) AS total_amount,
COUNT(*) AS order_count
FROM orders
WHERE order_amount > 0
GROUP BY
TUMBLE(order_time, INTERVAL '5' MINUTE),
user_id;
-- Sliding Window(5 分ウィンドウ・1 分スライド)
SELECT
CAST(CURRENT_TIMESTAMP AS STRING) AS event_time,
user_id,
SUM(amount) AS sliding_total,
AVG(amount) AS avg_amount,
COUNT(DISTINCT product_id) AS unique_products
FROM orders
WHERE TUMBLE_ROWTIME(order_time, INTERVAL '5' MINUTE, INTERVAL '1' MINUTE) IS NOT NULL
GROUP BY
user_id,
TUMBLE(order_time, INTERVAL '5' MINUTE, INTERVAL '1' MINUTE);
-- Session Window(30 分非アクティブで終了)
SELECT
SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
SESSION_END(event_time, INTERVAL '30' MINUTE) AS session_end,
user_id,
COUNT(*) AS events_in_session,
COUNT(DISTINCT event_type) AS event_types
FROM user_events
GROUP BY
SESSION(event_time, INTERVAL '30' MINUTE),
user_id;
Stream-Stream Join
-- 最新のユーザー属性でイベント JOIN
SELECT
e.event_id,
e.user_id,
e.event_type,
p.product_name,
p.category,
e.event_time
FROM user_events e
INNER JOIN orders o
ON e.user_id = o.user_id
AND e.event_time BETWEEN o.order_time - INTERVAL '1' HOUR
AND o.order_time + INTERVAL '30' MINUTE
WHERE e.event_type = 'PURCHASE'
ORDER BY e.event_time DESC;
-- Temporal Join(Versioned Table に対する JOIN)
SELECT
o.order_id,
o.user_id,
o.amount,
u.user_name,
u.user_segment,
o.order_time
FROM orders o
LEFT JOIN user_master FOR SYSTEM_TIME AS OF o.order_time AS u
ON o.user_id = u.user_id;
ウィンドウ処理
┌──────────────────────────────────────────────────────────────────┐
│ Flink ウィンドウの種類 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 1️⃣ Tumbling Window │
│ • 時間幅で固定・重複なし │
│ • 例:5 分ごとに売上集計 │
│ ┌──────┐──────┬──────┬──────┐ │
│ │5min │5min │5min │5min │ │
│ └──────┴──────┴──────┴──────┘ │
│ │
│ 2️⃣ Sliding Window │
│ • ウィンドウ幅とスライド間隔が独立 │
│ • 例:過去 10 分データで 1 分スライド │
│ ┌─────────────────┐ │
│ │ 10min data │ │
│ ├─┬─────────────┤│ │
│ │ │ 10min slice 1││ │
│ ├─┤ 10min slice 2││ │
│ │ └─────────────┘│ │
│ └─────────────────┘ │
│ │
│ 3️⃣ Session Window │
│ • 非アクティブ期間でウィンドウ終了 │
│ • ユーザーセッション分析に最適 │
│ Event Event Event [Gap] Event Event │
│ │ │ │ │ │ │
│ ├─────────┤ Session1 [終了] ├───┤ Session2 │
│ │
│ 4️⃣ Global Window │
│ • すべてのデータをグループ化・Trigger 手動指定 │
│ • カスタムウィンドウロジックに最適 │
│ │
└──────────────────────────────────────────────────────────────────┘
実装例
// Tumbling Window - 5 分ごと
DataStream<OrderSummary> tumblingResults = orders
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderAggregator())
.name("Tumbling-5min-Aggregation");
// Sliding Window - 10 分ウィンドウ、1 分スライド
DataStream<OrderStats> slidingResults = orders
.keyBy(Order::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.aggregate(new StatisticsAggregator())
.name("Sliding-10min-1min");
// Session Window - 30 分非アクティブ
DataStream<SessionSummary> sessionResults = events
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregator())
.name("Session-30min-Gap");
// Global Window + 遅延イベント処理
DataStream<CustomResult> customResults = events
.keyBy(Event::getCategory)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1000)) // 1000 イベントか 1 分で出力
.allowedLateness(Time.minutes(5)) // 5 分までの遅延を許容
.aggregate(new CustomAggregator())
.name("Custom-Processing");
ステートフル処理
State Backend の選択
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// RocksDB State Backend(推奨・大規模状態向け)
env.setStateBackend(new RocksDBStateBackend(
"s3://my-checkpoint-bucket/rocksdb",
true // incremental checkpoints
));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// HashMap State Backend(小規模状態向け)
env.setStateBackend(new HashMapStateBackend());
ValueState(スカラ値の保存)
public class PreviousValueProcessor extends KeyedProcessFunction<String, Event, Alert> {
private transient ValueState<String> previousState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("previous", String.class);
previousState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<Alert> out) {
String previous = previousState.value();
if (previous != null && previous.equals(event.getValue())) {
out.collect(new Alert("Duplicate detected", event.getValue()));
}
previousState.update(event.getValue());
}
}
MapState(複数キーの保存)
public class EventCounterProcessor extends KeyedProcessFunction<String, Event, EventStats> {
private transient MapState<String, Integer> eventCounts;
@Override
public void open(Configuration parameters) {
MapStateDescriptor<String, Integer> descriptor =
new MapStateDescriptor<>("eventCounts", String.class, Integer.class);
eventCounts = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<EventStats> out) {
String eventType = event.getType();
Integer count = eventCounts.get(eventType);
count = (count == null ? 1 : count + 1);
eventCounts.put(eventType, count);
// 統計情報を出力
Iterator<Map.Entry<String, Integer>> iterator = eventCounts.iterator();
Map<String, Integer> stats = new HashMap<>();
iterator.forEachRemaining(entry -> stats.put(entry.getKey(), entry.getValue()));
out.collect(new EventStats(ctx.getCurrentKey(), stats));
}
}
ListState(複数値リストの保存)
public class SequenceDetectorProcessor extends KeyedProcessFunction<String, Event, Sequence> {
private transient ListState<Event> eventHistory;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Event> descriptor =
new ListStateDescriptor<>("history", Event.class);
eventHistory = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<Sequence> out) {
eventHistory.add(event);
// 最後の 10 イベントを保持
List<Event> history = new ArrayList<>();
eventHistory.get().forEach(history::add);
if (history.size() > 10) {
// 古いイベントを削除(手動で状態を管理)
List<Event> truncated = history.subList(history.size() - 10, history.size());
eventHistory.clear();
truncated.forEach(e -> {
try {
eventHistory.add(e);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
}
// パターンマッチング
if (matchesSequence(history)) {
out.collect(new Sequence(ctx.getCurrentKey(), history));
}
}
private boolean matchesSequence(List<Event> history) {
// カスタムシーケンス検出ロジック
return history.size() >= 3 &&
history.get(history.size()-1).getValue() > 100 &&
history.get(history.size()-2).getValue() > 50;
}
}
Checkpointing と障害復旧
Checkpoint Configuration:
├─ Checkpointing Enabled: true
├─ Checkpoint Interval: 60 seconds # 60 秒ごとに状態 checkpoint
├─ Min Pause Between Checkpoints: 10s # 最小 10 秒の間隔確保
├─ Mode: EXACTLY_ONCE # 重複排除保証
├─ Timeout: 600 seconds # checkpoint 完了待機時間
├─ Storage Path: s3://bucket/checkpoints/
└─ Incremental Checkpoints: true # 差分のみ保存で高速化
Checkpoint メカニズム
時刻 Job Manager Task Manager 1 Task Manager 2 S3 Storage
│
├─ T=0s [Checkpoint 1000]
│ ├─→ barrier broadcast
│ │ ├→ [Processes up to barrier]
│ │ │ └─→ [Snapshots state]
│ │ │ ├─→ [Writes snapshot]
│ │ │ └→ S3://.../ckpt-1000/
│ │ │
│ │ └→ [Processes up to barrier]
│ │ └─→ [Snapshots state]
│ │ ├─→ [Writes snapshot]
│ │ └→ S3://.../ckpt-1000/
│ │
│ └─ [All snapshots done]
│ ├─→ [Mark Checkpoint 1000 complete]
│ └─→ [Update metadata]
│
├─ T=60s [Checkpoint 1001] (Repeat cycle)
│
└─ Failure! [Task Manager 2 crashes]
[Job Manager detects]
└─ [Restore from Checkpoint 1000]
├─ [Reload state from S3]
├─ [Restart Task Manager 2]
└─ [Resume processing from Checkpoint]
Savepoint による手動スナップショット
# 実行中のジョブを一時停止して Savepoint を作成
aws kinesisanalyticsv2 create-savepoint \
--application-name my-flink-app \
--savepoint-name my-manual-savepoint
# 別バージョンの JAR で復旧(状態は保持)
aws kinesisanalyticsv2 update-application \
--application-name my-flink-app \
--application-configuration-update '{
"ApplicationCodeConfigurationUpdate": {
"S3ContentLocationUpdate": {
"BucketARNUpdate": "arn:aws:s3:::my-bucket",
"FileKeyUpdate": "my-app-2.0.jar"
}
}
}' \
--restore-from-savepoint-description '{
"SavepointName": "my-manual-savepoint"
}'
Connector エコシステム
┌───────────────────────────────────────────────────────────────┐
│ Managed Flink Connector サポート状況(2026) │
├───────────────────────────────────────────────────────────────┤
│ │
│ ✅ ネイティブサポート(推奨) │
│ ├─ Kinesis Data Streams │
│ ├─ MSK(Kafka) │
│ ├─ Amazon S3 │
│ ├─ Kinesis Data Firehose │
│ ├─ DynamoDB(Lookup用) │
│ └─ Lambda(Sink) │
│ │
│ 🔌 コミュニティ Connector(JAR に含める) │
│ ├─ Elasticsearch / OpenSearch │
│ ├─ JDBC(RDS / Redshift) │
│ ├─ HBase │
│ ├─ MongoDB │
│ ├─ Cassandra │
│ └─ BigQuery(GCP) │
│ │
│ 📊 Apache Iceberg / Delta Lake │
│ ├─ Flink Iceberg Connector │
│ ├─ Delta Connector │
│ └─ HuDi Connector │
│ │
│ 🚀 新規(2025-2026) │
│ ├─ Redshift Data Sharing │
│ ├─ S3 Express One Zone │
│ └─ Graviton2 Optimized Build │
│ │
└───────────────────────────────────────────────────────────────┘
Connector の実装例
// Kinesis Source
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> stream = env.addSource(new FlinkKinesisConsumer<>(
"my-stream",
new SimpleStringSchema(),
properties // AWS credentials・region 設定
))
.name("Kinesis-Source");
// S3 Sink
stream.addSink(new StreamingFileSink.BulkFormatBuilder<>(
new Path("s3://my-bucket/output/"),
new ParquetAvroWriters.forGenericRecord(schema)
)
.withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy/'month='MM/'day='dd/'hour='HH"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(128 * 1024 * 1024)
.build())
.build())
.name("S3-Sink");
env.execute("S3-Writer");
// Kafka (MSK) Sink with Exactly-Once
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-broker:9092");
props.setProperty("transaction.id", "flink-sink-" + UUID.randomUUID());
stream.addSink(new FlinkKafkaProducer<>(
"output-topic",
new SimpleStringSchema(),
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
))
.name("Kafka-Sink-ExactlyOnce");
類似サービス比較表
| 観点 | Managed Flink | Lambda Stream | Spark Streaming | Kafka Streams |
|---|---|---|---|---|
| サーバーレス | ✅ | ✅ | ❌(EMR) | ❌ |
| ステートフル処理 | ✅(RocksDB) | △(外部DB) | ✅ | ✅ |
| Exactly-Once | ✅ | △ | ✅ | △(難) |
| レイテンシー | ミリ秒 | 秒 | 秒〜分 | ミリ秒 |
| 複雑度 | 中 | 低 | 高 | 中 |
| 学習曲線 | 中 | 低 | 高 | 中 |
| スケーラビリティ | 高(自動) | 高(自動) | 高(手動) | 高(手動) |
| 開発言語 | Java/Scala/Python | Python/Node.js/Java | Scala/Python/SQL | Java |
| マネージド度 | 完全 | 完全 | 部分 | なし |
| コスト(小規模) | 低 | 低 | 中 | 低 |
| コスト(大規模) | 中 | 高 | 中 | 低 |
ベストプラクティス
✅ Do: 推奨プラクティス
1. Exactly-Once Semantics を明示的に設定
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointInterval(60000); // 60 秒
config.setMinPauseBetweenCheckpoints(10000);
config.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);
理由: 金融・決済など「重複排除が重要」な場面でデータ品質を確保
2. Watermark を適切に設定
DataStream<Event> stream = env.addSource(source)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp())
);
理由: 遅延イベントの正確なウィンドウ割り当てを確保
3. 適切なウィンドウタイプ選択
// セッション分析 → Session Window
window(EventTimeSessionWindows.withGap(Time.minutes(30)))
// 定期集計 → Tumbling Window
window(TumblingEventTimeWindows.of(Time.minutes(5)))
// 移動平均 → Sliding Window
window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
理由: ウィンドウタイプごとに計算効率とスキマンティクスが異なる
4. KeyBy による並列化
// 複数キーでの並列処理(推奨)
stream
.keyBy(event -> new Tuple2<>(event.getUserId(), event.getRegion()))
.window(...)
.apply(...)
// キーレス処理(避ける・スケーリング不可)
stream.windowAll(...)
理由: KeyBy で複数の並列タスクに分散、スループット向上
5. RocksDB を大規模状態で使用
env.setStateBackend(new RocksDBStateBackend(
"s3://checkpoint-bucket/rocksdb",
true // incremental snapshots
));
理由: テラバイト級の大規模状態を効率的に管理
❌ Don’t: アンチパターン
1. Checkpointing なしで本番運用
// ❌ 危険:Checkpointing が無効
config.disableCheckpointing();
// ✅ 正解
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
2. Event Time を設定しない
// ❌ Processing Time のみ(遅延イベント対応不可)
stream.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// ✅ Event Time + Watermark
stream.assignTimestampsAndWatermarks(strategy)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
3. 外部システムへのブロッキング I/O
// ❌ 危険:ブロッキング呼び出し(スループット低下)
DataStream<Result> results = stream.map(event -> {
String productName = databaseLookup(event.getProductId()); // ブロッキング
return new Result(event, productName);
});
// ✅ 正解:非同期 I/O
DataStream<Result> asyncResults = AsyncDataStream.unorderedWait(
stream,
new AsyncDatabaseLookupFunction(),
10,
TimeUnit.SECONDS,
100
);
4. Stateful Processing での Key Distribution の無視
// ❌ 危険:キーが偏る(一部タスクに負荷集中)
stream.keyBy(event -> event.getCountry()) // 数個の国に偏る可能性
// ✅ 正解:キーを分散化
stream.keyBy(event -> Tuple2.of(event.getCountry(), event.getCity()))
5. Savepoint 戦略の欠如
// ❌ 保険なし:障害時に全メトリクス失う
// → Savepoint を定期作成していない
// ✅ 正解:定期的に Savepoint 作成
// cron で定期実行:
// aws kinesisanalyticsv2 create-savepoint --application-name app --savepoint-name sp-$(date +%s)
トラブルシューティング
| 症状 | 原因 | 対策 |
|---|---|---|
| Application が遅延イベントを無視 | Watermark 未設定 | assignTimestampsAndWatermarks() を追加 |
| メモリ OOM エラー | 状態が大きすぎる | RocksDB Backend に切り替え・State TTL 設定 |
| Checkpoint 失敗・Timeout | S3 バケットへのアクセス権限不足 | IAM ロールに s3:* 権限追加 |
| タスク 1 つに負荷集中 | Key Distribution が偏っている | Composite Key(複数フィールド)に変更 |
| 重複イベント処理 | Checkpointing Mode が At-Least-Once | EXACTLY_ONCE に変更 |
| ウィンドウ結果が出ない | Watermark が Event Time を追い越していない | Watermark Strategy を緩和(Duration.ofSeconds() を増加) |
| Restore 後のスキーマ不整合 | State Serializer 変更 | Compatible Serializer を実装・Migration Strategy 計画 |
パフォーマンス最適化
最適化レイヤー別 対策内容 期待効果
─────────────────────────────────────────────────────────
アプリケーション層 • 効率的なアルゴリズム 30-50% 改善
• 不要なオブジェクト削減
• State Size 最小化
Flink 設定層 • Parallelism 調整 20-40% 改善
• Batch Timeout 最適化
• Backend 切り替え
インフラ層 • KPU スケーリング リニアスケール
• Checkpoint Interval 5-15% 改善
具体的な最適化
// 1. Parallelism 最適化(KPU数の2-3倍推奨)
env.setParallelism(4);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 2. Buffer Timeout による遅延 vs スループットのトレードオフ
env.setBufferTimeout(100); // ms
// 3. State TTL で不要な状態削除
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Event> stateDescriptor =
new ValueStateDescriptor<>("event", Event.class);
stateDescriptor.enableTimeToLive(ttlConfig);
// 4. Incremental Checkpoint で状態バックアップ高速化
RocksDBStateBackend backend = new RocksDBStateBackend("s3://bucket/", true);
env.setStateBackend(backend);
コスト管理
コスト構成(月額)
═══════════════════════════════════════
実行中の KPU コスト : $0.11/KPU-時間
停止時の KPU コスト : $0.011/KPU-時間(オーケストレーション)
Checkpoint ストレージ : $0.023/GB-月(S3)
データ転送 : $0.02/GB(Kinesis 出力)
───────────────────────────────────────
例1(小規模):1 KPU × 730 時間 × $0.11 = $80/月
例2(中規模):4 KPU × 730 時間 × $0.11 = $321/月
例3(大規模):10 KPU × 730 時間 × $0.11 = $803/月
+ Checkpoint 50GB × $0.023 = $1.15/月
+ Kinesis Output 100GB × $0.02 = $2/月
≈ $806/月
コスト削減戦略
-
Reserved Capacity(将来実装予定)
- 1 年契約:30-40% 割引
-
Auto Scaling 最適化
- 不必要なスケールアップを避ける
- Parallelism Per KPU を設定
-
Checkpointing 最適化
- Incremental Checkpoints を使用(差分のみ保存)
- TTL で古い状態を削除
-
不要なアプリ停止
- 開発環境は実装後停止
- スケジュール実行可能なジョブは Lambda に移行
セキュリティ
セキュリティ層 対策 標準設定
──────────────────────────────────────────────────
認証・認可 IAM ロール・権限管理 ✅
データ暗号化 S3 での State 暗号化 ✅
ネットワーク分離 VPC Endpoint(将来) 🔄
監査ログ CloudTrail・CloudWatch Logs ✅
アプリケーション 入力検証・SQL インジェクション対策 👨💻
IAM ポリシー例
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "FlinkApplicationAccess",
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:region:account:stream/*"
},
{
"Sid": "CheckpointStorage",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::checkpoint-bucket/*"
},
{
"Sid": "CloudWatchMetrics",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
2025-2026 最新動向
新機能・改善
-
Flink 1.19 での Table API/SQL 最適化
- Batch Mode での処理速度向上(30% 改善)
- Incremental Join State Management
-
PyFlink 強化
- Pure Python(C++ Bridge 依存削減)
- Pandas UDF サポート拡大
-
Studio Notebook の AI 支援
- AI による最適化提案
- 自動クエリ生成(2026 Q2 予定)
-
Iceberg/Hudi 統合深化
- Time Travel Query サポート
- Schema Evolution の自動処理
-
CloudWatch Application Signals 統合
- Flink 特有のメトリクス自動取得
- SLO 自動設定
学習リソース
公式ドキュメント・チュートリアル
- Amazon Managed Service for Apache Flink - Developer Guide
- Managed Service for Apache Flink Studio User Guide
- Apache Flink Official Documentation
AWS ブログ・記事
外部リソース・OSS
ベンダートレーニング
実装例・チェックリスト
本番環境デプロイチェックリスト
- [ ] IAM ロール・権限を最小権限で設定
- [ ] Checkpointing Mode を EXACTLY_ONCE に設定
- [ ] Checkpoint Interval・Timeout を適切に設定
- [ ] Watermark Strategy を定義(遅延許容度を明示)
- [ ] Auto Scaling Policy を設定(Parallelism・KPU)
- [ ] CloudWatch Alarms を設定(Error Rate・Latency・Checkpoint Failure)
- [ ] Log Level を INFO 以上(本番は WARN)
- [ ] S3 Checkpoint Bucket に適切なライフサイクルポリシー
- [ ] Savepoint を定期作成スケジュール設定
- [ ] 災害復旧テスト実行
- [ ] 負荷テスト実施(ピークトラフィック × 1.5 倍)
問題診断フローチャート
アプリケーション遅延?
├─ Yes → Lag Monitor で Consumed Position 確認
│ ├─ Source Lag 大 → ソース側問題(Kinesis・Kafka)
│ ├─ State Access Lag 大 → RocksDB I/O ボトルネック
│ └─ Output Lag 大 → Sink 側問題
│
└─ No → 通常運用継続
Checkpoint Failure?
├─ Timeout? → S3 Checkpoint Bucket アクセス確認・権限確認
├─ Memory OOM? → State Size 確認・RocksDB Cache 調整
└─ Network Error? → CloudWatch Logs で詳細確認
まとめ
Amazon Managed Service for Apache Flink は、「ストリーミングデータに対するステートフルな複雑な処理を、サーバーレスでスケーラブルに実行するための標準プラットフォーム」 です。
主な価値提案:
- インフラ管理の完全排除 - AWS がスケーリング・Checkpointing・HA を全て担当
- Exactly-Once 処理保証 - 金融・決済など重複排除が必須なユースケースに対応
- 複数言語サポート - Java/Scala/Python で開発可能
- リッチなエコシステム - Kinesis・Kafka・S3 等との深い統合
- オープンソース互換性 - セルフマネージド Flink との知識・コード再利用可能
適用判断:
- ✅ ミリ秒レイテンシー必須
- ✅ 複雑なステートフル処理必要
- ✅ Exactly-Once 保証必須
- ✅ AWS 中心のアーキテクチャ
- ❌ シンプルフィルタリングのみ → Lambda 推奨
- ❌ 完全なマルチクラウド要件 → Kafka Streams 検討
今後の展開(2026 年):
- PyFlink の Pure Python 化(パフォーマンス向上)
- AI による自動最適化提案
- Iceberg Time Travel Query の完全サポート
- Reserved Capacity 価格モデル
参考文献
AWS 公式
- Amazon Managed Service for Apache Flink User Guide
- AWS Kinesis Analytics Pricing
- Managed Flink - Getting Started
Apache Flink 公式
技術ブログ・記事
- AWS Big Data Blog - Streaming Analytics
- Confluent - Kafka vs Flink Comparison
- Databricks - Spark Streaming vs Flink
書籍
- “Stream Processing with Apache Flink” - Fabian Hueske, Vasiliki Kalavri
- “Designing Data-Intensive Applications” - Martin Kleppmann
最終更新:2026-04-26 バージョン:v2.0