目次

Amazon Managed Service for Apache Flink 完全ガイド 2026

ストリーミング分析の本質とスケーラブルなリアルタイム処理基盤

Amazon Managed Service for Apache Flink(旧 Kinesis Data Analytics for Apache Flink)は、Apache Flink アプリケーションをサーバーレスで実行する完全マネージドストリーミング分析サービス です。Kinesis Data Streams・MSK(Kafka)からのリアルタイムデータをミリ秒レイテンシーで処理し、ステートフルな複雑なストリーミング処理・窓集計・複合イベント処理(CEP)をスケーラブルに実現します。本ドキュメントは、Managed Flink の本質・アーキテクチャ・実装・比較・ベストプラクティスを体系的に解説する包括的ガイドです。

ドキュメントの目的

本ガイドは以下を対象としています。

  • 初心者向け: Apache Flink とは何か、なぜマネージドサービスが必要かを学びたい方
  • データエンジニア向け: Java/Scala/Python による Flink アプリケーション開発・デプロイ
  • ストリーミング処理実務者向け: Stateful Processing・Checkpointing・Window 処理・カスタムロジック実装
  • アーキテクト向け: Lambda・Spark Streaming・Kafka Streams との比較・選定判断
  • 運用者向け: Scaling・Monitoring・Troubleshooting・Cost Optimization
  • Flink 1.18/1.19 対応: Table API/SQL の大幅な最適化、PyFlink による Python サポート強化
  • Studio Notebook の進化: Zeppelin ベースの対話的分析、Flink SQL の interactive query
  • Connector の拡充: MSK・Kinesis・S3・Firehose・Iceberg のネイティブサポート
  • Stateful Computing の深化: RocksDB ステートバックエンド、Incremental Checkpointing
  • CloudWatch 統合拡張: Application Signals による APM、カスタムメトリクス自動取得
  • Iceberg/Hudi 統合: ACID Transaction、Time Travel、Data Lake との統合
  • ML Pipeline 連携: SageMaker とのネイティブ統合(2026 年 H1)

概要

初心者向けメモ: Managed Flink は「ストリーミングデータをリアルタイムに複雑に処理し、状態を維持しながら結果を出力するサービス」です。単なるデータフィルタリングではなく、5 分間の売上合計・ユーザーセッション追跡・不正パターン検知など、複数データポイントを組み合わせた分析をミリ秒単位で実現します。Lambda ではシステムの複雑さから実現が難しく、セルフマネージド Flink クラスタでは運用負荷が大きいため、Managed Service が最適です。

Managed Service for Apache Flink は AWS が提供する Apache Flink の完全マネージド実行環境 です。インフラ管理・スケーリング・Checkpointing・Failover は AWS が全て管理し、開発者は Flink アプリケーションロジックに専念できます。

graph TD
    subgraph DataSources["データソース"]
        KDS["Kinesis Data Streams"]
        MSK["MSK Kafka"]
        S3["Amazon S3"]
        RDS["RDS / Aurora"]
    end
    
    subgraph ProcessingLayer["ストリーミング処理層"]
        Lambda["Lambda\nステートレス軽量処理"]
        Flink["Managed Flink\nステートフル複雑処理"]
        SparkStreaming["Spark Streaming\nマイクロバッチ処理"]
    end
    
    subgraph OutputTargets["出力先"]
        S3Out["S3 / Data Lake"]
        Redshift["Redshift / OpenSearch"]
        KDS_Out["Kinesis / Kafka"]
        Dashboard["リアルタイム Dashboard"]
    end
    
    DataSources --> ProcessingLayer
    ProcessingLayer --> OutputTargets
    
    style Flink fill:#FF9900
    style KDS fill:#FF9900

定義

AWS 公式による定義:

“Amazon Managed Service for Apache Flink allows you to build and run real-time, fault-tolerant streaming applications, using open-source Apache Flink frameworks with multiple language support (Java, Scala, and Python).”

Apache Flink をベースとしたフルマネージドサービスで、分散ストリーム処理エンジンの複雑な運用を AWS が担当します。


目次

  1. 概要
  2. Managed Flink が解決する課題
  3. 主な特徴
  4. アーキテクチャ
  5. コアコンポーネント
  6. Flink Application vs Studio
  7. API レベル
  8. 主要ユースケース
  9. 設定・操作の具体例
  10. Flink SQL
  11. ウィンドウ処理
  12. ステートフル処理
  13. Checkpointing と障害復旧
  14. Connector エコシステム
  15. 類似サービス比較表
  16. ベストプラクティス
  17. トラブルシューティング
  18. パフォーマンス最適化
  19. コスト管理
  20. セキュリティ
  21. 2025-2026 最新動向
  22. 学習リソース
  23. 実装例・チェックリスト
  24. まとめ
  25. 参考文献

課題1: ステートフルなストリーミング処理の複雑性

従来の課題: Lambda は本質的にステートレスであり、過去 5 分間の売上合計・ユーザーセッション累積・移動平均など、状態を維持する処理が極めて難しい。外部データベースに状態を保存するアプローチは遅延増加・コスト増加につながる。

Managed Flink での解決: RocksDB ステートバックエンドで大規模なステート管理をネイティブサポート。状態はローカルディスクに保存され、ミリ秒単位でアクセス可能。Checkpointing で障害時の状態復旧を自動化。

課題2: インフラ管理の複雑性

従来の課題: セルフマネージド Flink クラスタ(EC2/EKS)は、クラスタスケーリング・ネットワーク設定・Checkpointing ストレージ・障害対応の自動化が必要で運用負荷が極めて大きい。

Managed Flink での解決: AWS が全インフラ管理を担当。KPU(Kinesis Processing Unit)ベースの自動スケーリング・高可用性・バックアップ・バージョン管理を提供。開発者はビジネスロジックに集中可能。

課題3: エグザクトリーワンス(Exactly-Once)保証の実現難度

従来の課題: 金融取引・請求計算など「1 回だけ正確に処理する」という要件は、分散システムでは極めて複雑。Lambda + DynamoDB では idempotency を自分で実装する必要があり、エラーハンドリングが複雑になる。

Managed Flink での解決: Flink がネイティブに Exactly-Once セマンティクスをサポート。Distributed Snapshots(チェックポイント)とトランザクション的なシンク処理により、自動的に重複処理を排除。


主な特徴

┌─────────────────────────────────────────────────────────────┐
│         Managed Service for Apache Flink の特徴            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ✅ サーバーレス実行                                        │
│     • インフラ管理不要、KPU 単位でスケーリング             │
│     • Auto scaling・High availability 標準搭載            │
│                                                             │
│  ✅ 複数言語サポート                                        │
│     • Java / Scala / Python(PyFlink)                     │
│     • Flink SQL による SQL ベース処理                      │
│                                                             │
│  ✅ ネイティブな状態管理                                    │
│     • RocksDB ステートバックエンド                          │
│     • 大容量状態を効率的に管理(テラバイト級)             │
│                                                             │
│  ✅ Exactly-Once 処理保証                                  │
│     • Distributed Snapshots Checkpointing                 │
│     • 重複排除の自動化                                     │
│                                                             │
│  ✅ 柔軟なウィンドウ処理                                    │
│     • Tumbling / Sliding / Session Windows                │
│     • カスタムウィンドウ・ウォーターマーク                 │
│                                                             │
│  ✅ リッチな Connector エコシステム                         │
│     • Kinesis / Kafka / S3 / Redshift / Firehose         │
│     • カスタムソース・シンク実装可能                       │
│                                                             │
│  ✅ Studio Notebook(Interactive 分析)                    │
│     • Zeppelin ベースの対話的 Flink 開発                   │
│     • SQL・Python・Scala を同一 Notebook で実行            │
│                                                             │
│  ✅ CloudWatch 統合監視                                     │
│     • Application Signals・カスタムメトリクス              │
│     • フルな可観測性を標準搭載                             │
│                                                             │
│  ✅ 完全な互換性                                            │
│     • オープンソース Apache Flink との完全互換             │
│     • セルフマネージド Flink との知識の再利用               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

アーキテクチャ

graph LR
    subgraph Sources["ソース"]
        KDS["Kinesis Streams"]
        MSK["MSK Kafka"]
        S3_in["S3 / DynamoDB"]
    end
    
    subgraph FlinkApp["Flink Application<br/>(マネージド実行)"]
        JAR["JAR/Python コード"]
        Runtime["Flink Runtime<br/>(TaskManager x N)"]
        State["State Backend<br/>RocksDB"]
        CP["Checkpointing"]
    end
    
    subgraph Processing["処理層"]
        DataStream["DataStream API"]
        TableSQL["Table API / SQL"]
        CEP["Complex Event<br/>Processing"]
    end
    
    subgraph Outputs["出力先"]
        S3_out["S3 / Data Lake"]
        Redshift["Redshift / OpenSearch"]
        KDS_out["Kinesis / Kafka"]
        Lambda_sink["Lambda / Lambda"]
        Dashboard["Real-time Dashboard"]
    end
    
    Sources --> FlinkApp
    FlinkApp --> Processing
    Processing --> Outputs
    
    State --> CP
    CP -->|Snapshots| S3_ckpt["S3 Checkpoint Storage"]
    
    style FlinkApp fill:#FF9900
コンポーネント 役割 管理方式
Application JAR/Python ビジネスロジック定義 ユーザー管理・S3 保存
Flink Runtime 分散処理エンジン AWS マネージド
Task Manager 並列処理実行 KPU ベースの自動スケーリング
State Backend 状態保存(RocksDB) AWS マネージド・S3 バックアップ
Checkpointing 障害復旧・スナップショット 自動実行・間隔設定可能
Job Manager Orchestration・Coordination AWS マネージド・HA 対応

コアコンポーネント

1. DataStream API(低レベル制御)

ユーザーが Flink の最も細かい制御を行う API。Java/Scala/Python で実装可能。

// Java DataStream API 例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> events = env
    .addSource(new KinesisSource())
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new EventAggregator())
    .addSink(new S3Sink());

env.execute("MyFlinkApplication");

用途: 細粒度の状態管理・カスタムウィンドウ・非同期処理(Async I/O)が必要な場合。

DataStream よりも高い抽象レベルで、SQL/DataFrame ライクなインターフェース。最適化が自動化される。

-- Flink SQL 例
CREATE TABLE kafka_events (
    event_id STRING,
    user_id BIGINT,
    amount DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'events',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'format' = 'json'
);

CREATE TABLE results AS
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
    user_id,
    SUM(amount) AS total_amount,
    COUNT(*) AS event_count
FROM kafka_events
GROUP BY 
    TUMBLE(event_time, INTERVAL '1' MINUTE),
    user_id;

用途: SQL エキスパートによる開発・初期の調査・シンプルな集計処理。

3. Process Function(カスタムロジック)

DataStream API と Flink State Management を融合させた中間レベル API。Timer・低レイテンシーの非同期処理に最適。

public class StatefulEventProcessor extends KeyedProcessFunction<String, Event, Result> {
    private transient ValueState<Long> totalAmount;
    private transient MapState<String, Integer> eventCounts;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Long> totalDescriptor = 
            new ValueStateDescriptor<>("total", Long.class);
        totalAmount = getRuntimeContext().getState(totalDescriptor);
        
        MapStateDescriptor<String, Integer> countsDescriptor =
            new MapStateDescriptor<>("counts", String.class, Integer.class);
        eventCounts = getRuntimeContext().getMapState(countsDescriptor);
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) {
        Long current = totalAmount.value();
        totalAmount.update((current == null ? 0 : current) + event.getAmount());
        
        // Timer で定期的に結果を出力
        ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 60000);
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
        Long total = totalAmount.value();
        out.collect(new Result(ctx.getCurrentKey(), total));
    }
}

用途: 複雑な状態管理・タイマー駆動処理・非同期外部 API 呼び出し。

4. CEP(Complex Event Processing)

複数イベントのパターン検出。例:「10 秒以内に同一ユーザーから 5 回以上の不正ログイン試行」を検知。

Pattern<Event, ?> pattern = Pattern
    .<Event>begin("login_failures")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("LOGIN_FAILED");
        }
    })
    .times(5)
    .within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(
    eventStream.keyBy(Event::getUserId), 
    pattern
);

patternStream.select(new PatternSelectFunction<Event, Alert>() {
    @Override
    public Alert select(Map<String, List<Event>> pattern) {
        List<Event> loginFailures = pattern.get("login_failures");
        return new Alert(
            loginFailures.get(0).getUserId(),
            "Suspicious login pattern detected",
            loginFailures.size()
        );
    }
}).addSink(alertSink);

用途: 不正検知・異常検知・イベントコンボの自動検出。


観点 Application Studio
使用形態 IDE で開発・CI/CD デプロイ ブラウザ・対話的開発
言語 Java / Scala / Python(PyFlink) SQL / Python / Scala
開発スタイル プログラミング型・SDLC対応 探索型・即座な検証
本番対応 ✅ 完全対応 △(Studio から Deployed App に昇格可能)
Notebook 環境 ✅(Zeppelin)
長期実行ジョブ △(確認後 Application に変換推奨)
チーム開発 ✅(Git 連携) △(Shared Workspace)
パフォーマンス 高(インライン最適化) 中(対話的分析向け)

選択ガイド:

  • エンタープライズ環境・本番ジョブ → Application(Git・CI/CD 連携)
  • 初期分析・SQL 検索・アドホック → Studio(即座の探索)
  • Notebook 型開発・データサイエンティスト → Studio Notebook

API レベル

┌──────────────────────────────────────────────────────────┐
│             Flink API 抽象レベル図                      │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  高度な抽象   ┌─ Flink SQL           [最も簡潔]        │
│  化         ├─ Table API / DataFrame                    │
│             ├─ DataStream API / Operators               │
│             ├─ Process Function / State                 │
│             └─ Low-Level Source/Sink API [最も自由度]  │
│  低レベル                                               │
│                                                          │
│  ✅ SQL レベル:開発時間短・最適化自動化               │
│  ✅ DataStream:細粒度制御・複雑処理対応               │
│  ✅ Process Fn:Timer・State・非同期処理               │
│                                                          │
└──────────────────────────────────────────────────────────┘

主要ユースケース

1. リアルタイム不正検知(金融・EC)

  • 取引ストリームからリアルタイムに不正パターンを検知・ブロック
  • 過去 10 分間のユーザー取引額、取引頻度、地理情報から統計的異常検知
  • Flink 適用点: ステートフル集計・CEP パターン検知・ミリ秒遅延

2. IoT センサーデータリアルタイム集計

  • 数百万個のセンサーから秒単位でデータ到着、リアルタイムダッシュボード生成
  • 5 分ごとの機械ごと・工場ごとの集計値・異常値通知
  • Flink 適用点: 高スループット処理・ウィンドウ集計・複数キーでの並列処理

3. Kafka ストリームのリアルタイム ETL

  • MSK(Kafka)トピックを Flink で変換・エンリッチして別トピックに配信
  • ユーザーイベントに外部 API で顧客情報エンリッチ→ダウンストリーム消費
  • Flink 適用点: 高速変換・状態ベース JOIN・外部システム連携

4. ユーザーセッション分析

  • クリックストリームをセッションウィンドウで集計、セッション内行動を分析
  • セッション開始後 30 分で商品閲覧→購入の傾向を集計
  • Flink 適用点: Session Windows・時間ベースのイベント グループ化

5. リアルタイムレコメンデーション

  • ユーザーの最新の行動(過去 5 分)を基に推奨商品リスト更新
  • Flink で集計→Lambda で推奨エンジン実行→UI へのプッシュ
  • Flink 適用点: 低遅延状態管理・リアルタイム機能生成

6. アプリケーションログの異常検知

  • CloudWatch Logs からログストリームを Flink で分析、エラー率が 5% 超えで通知
  • アプリ群別・地域別の Error Rate を並列集計
  • Flink 適用点: 複数キーでの並列ストリーム処理・時系列統計

7. リアルタイム KPI ダッシュボード

  • SaaS プラットフォームの顧客ごとの全リアルタイム指標を秒単位で更新
  • 多テナント環境での顧客ごと・機能ごとの使用量・収益のリアルタイム集計
  • Flink 適用点: 高並列度処理・多キー集計・大規模状態管理

8. ストリーム間の Join 処理

  • ユーザーイベント(Action) と プロダクト カタログ(Product)の Join
  • Flink StreamStream Join・Temporal Join で最新カタログとの突き合わせ
  • Flink 適用点: 複数ストリーム結合・状態ベース検索

9. 決済後のバッチ集計

  • リアルタイム決済ストリームから 1 分単位の売上・トランザクション数を集計
  • 日次バッチの前提条件となるリアルタイム検証値を生成
  • Flink 適用点: Tumbling Window・時系列集計の自動化

10. ネットワークメトリクス分析

  • VPC Flow Logs・ALB ログをストリーム処理、ネットワーク異常検知
  • 源 IP・宛先 IP・ポート別トラフィック量の急激な変化を検知
  • Flink 適用点: 高スループット・複雑なフィルタリング・統計計算

設定・操作の具体例

AWS CLI によるアプリケーション作成

# JAR ファイルを S3 にアップロード
aws s3 cp target/my-flink-app-1.0.jar s3://my-flink-bucket/

# Managed Flink アプリケーション作成
aws kinesisanalyticsv2 create-application \
  --application-name my-streaming-app \
  --runtime-environment FLINK_1_19 \
  --service-execution-role arn:aws:iam::ACCOUNT:role/flink-role \
  --application-configuration \
    Inputs='[
      {
        "NamePrefix": "SOURCE_SQL_STREAM",
        "InputParallelism": 4,
        "InputStartingPositionConfiguration": {
          "InputStartingPosition": "LATEST"
        },
        "KinesisStreamsInput": {
          "ResourceARN": "arn:aws:kinesis:region:account:stream/my-input-stream"
        }
      }
    ]'

Application Configuration(JSON)

{
  "ApplicationName": "real-time-analytics",
  "RuntimeEnvironment": "FLINK_1_19",
  "ServiceExecutionRoleARN": "arn:aws:iam::ACCOUNT:role/flink-role",
  "ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
      "CheckpointConfiguration": {
        "ConfigurationType": "CUSTOM",
        "CheckpointingEnabled": true,
        "CheckpointInterval": 60000,
        "MinPauseBetweenCheckpoints": 10000,
        "CheckpointMode": "EXACTLY_ONCE"
      },
      "ParallelismConfiguration": {
        "ConfigurationType": "CUSTOM",
        "Parallelism": 4,
        "ParallelismPerKPU": 1,
        "AutoScalingEnabled": true
      },
      "MonitoringConfiguration": {
        "ConfigurationType": "CUSTOM",
        "MetricsLevel": "APPLICATION",
        "LogLevel": "INFO"
      }
    }
  }
}

CloudFormation による Infrastructure as Code

Resources:
  ManagedFlinkApplication:
    Type: AWS::KinesisAnalyticsV2::Application
    Properties:
      ApplicationName: my-analytics-app
      RuntimeEnvironment: FLINK_1_19
      ServiceExecutionRole: !GetAtt FlinkServiceRole.Arn
      ApplicationConfiguration:
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            CheckpointingEnabled: true
            CheckpointInterval: 60000
            MinPauseBetweenCheckpoints: 10000
            CheckpointMode: EXACTLY_ONCE
          ParallelismConfiguration:
            Parallelism: 4
            AutoScalingEnabled: true
          MonitoringConfiguration:
            MetricsLevel: APPLICATION
            LogLevel: INFO
      ApplicationCodeConfiguration:
        S3ContentLocation:
          BucketARN: !Sub 'arn:aws:s3:::${FlinkBucket}'
          FileKey: my-app-1.0.jar
          ObjectVersion: !Ref JarObjectVersion

  FlinkServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: kinesisanalytics.amazonaws.com
            Action: 'sts:AssumeRole'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Policies:
        - PolicyName: FlinkStreamAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - 'kinesis:*'
                  - 's3:*'
                Resource: '*'

SDK(Python Boto3)による操作

import boto3

client = boto3.client('kinesisanalyticsv2', region_name='us-east-1')

# アプリケーション一覧取得
response = client.list_applications()
for app in response['ApplicationSummaries']:
    print(f"App: {app['ApplicationName']}, Status: {app['ApplicationStatus']}")

# アプリケーション詳細取得
app_details = client.describe_application(ApplicationName='my-flink-app')
print(f"ARN: {app_details['ApplicationDetail']['ApplicationARN']}")
print(f"Status: {app_details['ApplicationDetail']['ApplicationStatus']}")
print(f"Create Timestamp: {app_details['ApplicationDetail']['CreateTimestamp']}")

# アプリケーション開始
start_response = client.start_application(
    ApplicationName='my-flink-app',
    RunConfiguration={
        'FlinkRunConfiguration': {
            'AllowNonRestoredState': False
        }
    }
)

# アプリケーション停止
stop_response = client.stop_application(ApplicationName='my-flink-app')

Terraform IaC

resource "aws_kinesisanalyticsv2_application" "example" {
  name              = "my-flink-application"
  runtime_environment = "FLINK_1_19"
  service_execution_role_arn = aws_iam_role.flink_role.arn

  application_configuration {
    flink_application_configuration {
      checkpoint_configuration {
        checkpointing_enabled         = true
        checkpoint_interval           = 60000
        min_pause_between_checkpoints = 10000
        checkpoint_mode               = "EXACTLY_ONCE"
      }

      parallelism_configuration {
        parallelism              = 4
        auto_scaling_enabled     = true
        parallelism_per_kpu      = 1
      }

      monitoring_configuration {
        log_level      = "INFO"
        metrics_level  = "APPLICATION"
      }
    }
  }

  application_code_configuration {
    s3_content_location {
      bucket_arn      = aws_s3_bucket.flink_code.arn
      file_key        = "my-app-1.0.jar"
      object_version  = aws_s3_object.jar_file.version_id
    }
  }

  tags = {
    Environment = "production"
    Team        = "data-engineering"
  }
}

resource "aws_iam_role" "flink_role" {
  name = "flink-service-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "kinesisanalytics.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "flink_policy" {
  name = "flink-execution-policy"
  role = aws_iam_role.flink_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "kinesis:*",
          "s3:*",
          "logs:*"
        ]
        Resource = "*"
      }
    ]
  })
}

ストリームテーブル定義

-- Kafka トピックからストリーム定義
CREATE TABLE orders (
    order_id STRING,
    user_id BIGINT,
    product_id BIGINT,
    amount DOUBLE,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

-- Kinesis Streams からストリーム定義
CREATE TABLE user_events (
    event_id STRING,
    user_id BIGINT,
    event_type STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kinesis',
    'stream' = 'user-events',
    'aws.region' = 'us-east-1',
    'format' = 'json'
);

-- Redshift への出力シンク
CREATE TABLE redshift_results (
    event_time TIMESTAMP(3),
    user_id BIGINT,
    total_amount DOUBLE,
    order_count BIGINT
) WITH (
    'connector' = 'redshift',
    'url' = 'jdbc:redshift://my-cluster.us-east-1.redshift.amazonaws.com:5439/mydb',
    'table-name' = 'order_summaries',
    'username' = 'admin',
    'password' = '...',
    'batch.size' = '1000',
    'flush.interval' = '60000'
);

ウィンドウ集計

-- Tumbling Window(5 分ごと)
INSERT INTO redshift_results
SELECT
    TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS event_time,
    user_id,
    SUM(amount) AS total_amount,
    COUNT(*) AS order_count
FROM orders
WHERE order_amount > 0
GROUP BY
    TUMBLE(order_time, INTERVAL '5' MINUTE),
    user_id;

-- Sliding Window(5 分ウィンドウ・1 分スライド)
SELECT
    CAST(CURRENT_TIMESTAMP AS STRING) AS event_time,
    user_id,
    SUM(amount) AS sliding_total,
    AVG(amount) AS avg_amount,
    COUNT(DISTINCT product_id) AS unique_products
FROM orders
WHERE TUMBLE_ROWTIME(order_time, INTERVAL '5' MINUTE, INTERVAL '1' MINUTE) IS NOT NULL
GROUP BY
    user_id,
    TUMBLE(order_time, INTERVAL '5' MINUTE, INTERVAL '1' MINUTE);

-- Session Window(30 分非アクティブで終了)
SELECT
    SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTE) AS session_end,
    user_id,
    COUNT(*) AS events_in_session,
    COUNT(DISTINCT event_type) AS event_types
FROM user_events
GROUP BY
    SESSION(event_time, INTERVAL '30' MINUTE),
    user_id;

Stream-Stream Join

-- 最新のユーザー属性でイベント JOIN
SELECT
    e.event_id,
    e.user_id,
    e.event_type,
    p.product_name,
    p.category,
    e.event_time
FROM user_events e
INNER JOIN orders o
    ON e.user_id = o.user_id
    AND e.event_time BETWEEN o.order_time - INTERVAL '1' HOUR
                          AND o.order_time + INTERVAL '30' MINUTE
WHERE e.event_type = 'PURCHASE'
ORDER BY e.event_time DESC;

-- Temporal Join(Versioned Table に対する JOIN)
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    u.user_name,
    u.user_segment,
    o.order_time
FROM orders o
LEFT JOIN user_master FOR SYSTEM_TIME AS OF o.order_time AS u
    ON o.user_id = u.user_id;

ウィンドウ処理

┌──────────────────────────────────────────────────────────────────┐
│                    Flink ウィンドウの種類                        │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1️⃣  Tumbling Window                                            │
│      • 時間幅で固定・重複なし                                     │
│      • 例:5 分ごとに売上集計                                     │
│      ┌──────┐──────┬──────┬──────┐                              │
│      │5min  │5min  │5min  │5min  │                              │
│      └──────┴──────┴──────┴──────┘                              │
│                                                                  │
│  2️⃣  Sliding Window                                             │
│      • ウィンドウ幅とスライド間隔が独立                           │
│      • 例:過去 10 分データで 1 分スライド                        │
│      ┌─────────────────┐                                        │
│      │   10min data    │                                        │
│      ├─┬─────────────┤│                                        │
│      │ │ 10min slice 1││                                        │
│      ├─┤ 10min slice 2││                                        │
│      │ └─────────────┘│                                        │
│      └─────────────────┘                                        │
│                                                                  │
│  3️⃣  Session Window                                             │
│      • 非アクティブ期間でウィンドウ終了                           │
│      • ユーザーセッション分析に最適                               │
│      Event  Event Event    [Gap]   Event Event                 │
│      │      │    │                 │    │                      │
│      ├─────────┤ Session1   [終了]   ├───┤ Session2            │
│                                                                  │
│  4️⃣  Global Window                                              │
│      • すべてのデータをグループ化・Trigger 手動指定              │
│      • カスタムウィンドウロジックに最適                           │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘

実装例

// Tumbling Window - 5 分ごと
DataStream<OrderSummary> tumblingResults = orders
    .keyBy(Order::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new OrderAggregator())
    .name("Tumbling-5min-Aggregation");

// Sliding Window - 10 分ウィンドウ、1 分スライド
DataStream<OrderStats> slidingResults = orders
    .keyBy(Order::getUserId)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
    .aggregate(new StatisticsAggregator())
    .name("Sliding-10min-1min");

// Session Window - 30 分非アクティブ
DataStream<SessionSummary> sessionResults = events
    .keyBy(Event::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator())
    .name("Session-30min-Gap");

// Global Window + 遅延イベント処理
DataStream<CustomResult> customResults = events
    .keyBy(Event::getCategory)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(1000))  // 1000 イベントか 1 分で出力
    .allowedLateness(Time.minutes(5))  // 5 分までの遅延を許容
    .aggregate(new CustomAggregator())
    .name("Custom-Processing");

ステートフル処理

State Backend の選択

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// RocksDB State Backend(推奨・大規模状態向け)
env.setStateBackend(new RocksDBStateBackend(
    "s3://my-checkpoint-bucket/rocksdb",
    true  // incremental checkpoints
));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// HashMap State Backend(小規模状態向け)
env.setStateBackend(new HashMapStateBackend());

ValueState(スカラ値の保存)

public class PreviousValueProcessor extends KeyedProcessFunction<String, Event, Alert> {
    private transient ValueState<String> previousState;
    
    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<String> descriptor = 
            new ValueStateDescriptor<>("previous", String.class);
        previousState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Alert> out) {
        String previous = previousState.value();
        
        if (previous != null && previous.equals(event.getValue())) {
            out.collect(new Alert("Duplicate detected", event.getValue()));
        }
        
        previousState.update(event.getValue());
    }
}

MapState(複数キーの保存)

public class EventCounterProcessor extends KeyedProcessFunction<String, Event, EventStats> {
    private transient MapState<String, Integer> eventCounts;
    
    @Override
    public void open(Configuration parameters) {
        MapStateDescriptor<String, Integer> descriptor = 
            new MapStateDescriptor<>("eventCounts", String.class, Integer.class);
        eventCounts = getRuntimeContext().getMapState(descriptor);
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<EventStats> out) {
        String eventType = event.getType();
        Integer count = eventCounts.get(eventType);
        count = (count == null ? 1 : count + 1);
        eventCounts.put(eventType, count);
        
        // 統計情報を出力
        Iterator<Map.Entry<String, Integer>> iterator = eventCounts.iterator();
        Map<String, Integer> stats = new HashMap<>();
        iterator.forEachRemaining(entry -> stats.put(entry.getKey(), entry.getValue()));
        
        out.collect(new EventStats(ctx.getCurrentKey(), stats));
    }
}

ListState(複数値リストの保存)

public class SequenceDetectorProcessor extends KeyedProcessFunction<String, Event, Sequence> {
    private transient ListState<Event> eventHistory;
    
    @Override
    public void open(Configuration parameters) {
        ListStateDescriptor<Event> descriptor = 
            new ListStateDescriptor<>("history", Event.class);
        eventHistory = getRuntimeContext().getListState(descriptor);
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Sequence> out) {
        eventHistory.add(event);
        
        // 最後の 10 イベントを保持
        List<Event> history = new ArrayList<>();
        eventHistory.get().forEach(history::add);
        
        if (history.size() > 10) {
            // 古いイベントを削除(手動で状態を管理)
            List<Event> truncated = history.subList(history.size() - 10, history.size());
            eventHistory.clear();
            truncated.forEach(e -> {
                try {
                    eventHistory.add(e);
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
            });
        }
        
        // パターンマッチング
        if (matchesSequence(history)) {
            out.collect(new Sequence(ctx.getCurrentKey(), history));
        }
    }
    
    private boolean matchesSequence(List<Event> history) {
        // カスタムシーケンス検出ロジック
        return history.size() >= 3 && 
               history.get(history.size()-1).getValue() > 100 &&
               history.get(history.size()-2).getValue() > 50;
    }
}

Checkpointing と障害復旧

Checkpoint Configuration:
  ├─ Checkpointing Enabled: true
  ├─ Checkpoint Interval: 60 seconds       # 60 秒ごとに状態 checkpoint
  ├─ Min Pause Between Checkpoints: 10s    # 最小 10 秒の間隔確保
  ├─ Mode: EXACTLY_ONCE                   # 重複排除保証
  ├─ Timeout: 600 seconds                 # checkpoint 完了待機時間
  ├─ Storage Path: s3://bucket/checkpoints/
  └─ Incremental Checkpoints: true        # 差分のみ保存で高速化

Checkpoint メカニズム

時刻        Job Manager          Task Manager 1       Task Manager 2       S3 Storage
│
├─ T=0s     [Checkpoint 1000]
│           ├─→ barrier broadcast
│           │   ├→ [Processes up to barrier]
│           │   │  └─→ [Snapshots state]
│           │   │      ├─→ [Writes snapshot]
│           │   │          └→ S3://.../ckpt-1000/
│           │   │
│           │   └→ [Processes up to barrier]
│           │      └─→ [Snapshots state]
│           │          ├─→ [Writes snapshot]
│           │              └→ S3://.../ckpt-1000/
│           │
│           └─ [All snapshots done]
│               ├─→ [Mark Checkpoint 1000 complete]
│               └─→ [Update metadata]
│
├─ T=60s    [Checkpoint 1001] (Repeat cycle)
│
└─ Failure! [Task Manager 2 crashes]
            [Job Manager detects]
            └─ [Restore from Checkpoint 1000]
               ├─ [Reload state from S3]
               ├─ [Restart Task Manager 2]
               └─ [Resume processing from Checkpoint]

Savepoint による手動スナップショット

# 実行中のジョブを一時停止して Savepoint を作成
aws kinesisanalyticsv2 create-savepoint \
  --application-name my-flink-app \
  --savepoint-name my-manual-savepoint

# 別バージョンの JAR で復旧(状態は保持)
aws kinesisanalyticsv2 update-application \
  --application-name my-flink-app \
  --application-configuration-update '{
    "ApplicationCodeConfigurationUpdate": {
      "S3ContentLocationUpdate": {
        "BucketARNUpdate": "arn:aws:s3:::my-bucket",
        "FileKeyUpdate": "my-app-2.0.jar"
      }
    }
  }' \
  --restore-from-savepoint-description '{
    "SavepointName": "my-manual-savepoint"
  }'

Connector エコシステム

┌───────────────────────────────────────────────────────────────┐
│       Managed Flink Connector サポート状況(2026)             │
├───────────────────────────────────────────────────────────────┤
│                                                               │
│  ✅ ネイティブサポート(推奨)                                 │
│  ├─ Kinesis Data Streams                                     │
│  ├─ MSK(Kafka)                                             │
│  ├─ Amazon S3                                                │
│  ├─ Kinesis Data Firehose                                    │
│  ├─ DynamoDB(Lookup用)                                      │
│  └─ Lambda(Sink)                                           │
│                                                               │
│  🔌 コミュニティ Connector(JAR に含める)                     │
│  ├─ Elasticsearch / OpenSearch                               │
│  ├─ JDBC(RDS / Redshift)                                   │
│  ├─ HBase                                                    │
│  ├─ MongoDB                                                  │
│  ├─ Cassandra                                                │
│  └─ BigQuery(GCP)                                          │
│                                                               │
│  📊 Apache Iceberg / Delta Lake                               │
│  ├─ Flink Iceberg Connector                                  │
│  ├─ Delta Connector                                          │
│  └─ HuDi Connector                                           │
│                                                               │
│  🚀 新規(2025-2026)                                         │
│  ├─ Redshift Data Sharing                                    │
│  ├─ S3 Express One Zone                                      │
│  └─ Graviton2 Optimized Build                                │
│                                                               │
└───────────────────────────────────────────────────────────────┘

Connector の実装例

// Kinesis Source
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> stream = env.addSource(new FlinkKinesisConsumer<>(
    "my-stream",
    new SimpleStringSchema(),
    properties  // AWS credentials・region 設定
))
.name("Kinesis-Source");

// S3 Sink
stream.addSink(new StreamingFileSink.BulkFormatBuilder<>(
    new Path("s3://my-bucket/output/"),
    new ParquetAvroWriters.forGenericRecord(schema)
)
.withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy/'month='MM/'day='dd/'hour='HH"))
.withRollingPolicy(
    DefaultRollingPolicy.builder()
        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
        .withMaxPartSize(128 * 1024 * 1024)
        .build())
.build())
.name("S3-Sink");

env.execute("S3-Writer");

// Kafka (MSK) Sink with Exactly-Once
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-broker:9092");
props.setProperty("transaction.id", "flink-sink-" + UUID.randomUUID());

stream.addSink(new FlinkKafkaProducer<>(
    "output-topic",
    new SimpleStringSchema(),
    props,
    FlinkKafkaProducer.Semantic.EXACTLY_ONCE
))
.name("Kafka-Sink-ExactlyOnce");

類似サービス比較表

観点 Managed Flink Lambda Stream Spark Streaming Kafka Streams
サーバーレス ❌(EMR)
ステートフル処理 ✅(RocksDB) △(外部DB)
Exactly-Once △(難)
レイテンシー ミリ秒 秒〜分 ミリ秒
複雑度
学習曲線
スケーラビリティ 高(自動) 高(自動) 高(手動) 高(手動)
開発言語 Java/Scala/Python Python/Node.js/Java Scala/Python/SQL Java
マネージド度 完全 完全 部分 なし
コスト(小規模)
コスト(大規模)

ベストプラクティス

✅ Do: 推奨プラクティス

1. Exactly-Once Semantics を明示的に設定

CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointInterval(60000);  // 60 秒
config.setMinPauseBetweenCheckpoints(10000);
config.enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);

理由: 金融・決済など「重複排除が重要」な場面でデータ品質を確保

2. Watermark を適切に設定

DataStream<Event> stream = env.addSource(source)
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp())
    );

理由: 遅延イベントの正確なウィンドウ割り当てを確保

3. 適切なウィンドウタイプ選択

// セッション分析 → Session Window
window(EventTimeSessionWindows.withGap(Time.minutes(30)))

// 定期集計 → Tumbling Window
window(TumblingEventTimeWindows.of(Time.minutes(5)))

// 移動平均 → Sliding Window
window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))

理由: ウィンドウタイプごとに計算効率とスキマンティクスが異なる

4. KeyBy による並列化

// 複数キーでの並列処理(推奨)
stream
    .keyBy(event -> new Tuple2<>(event.getUserId(), event.getRegion()))
    .window(...)
    .apply(...)

// キーレス処理(避ける・スケーリング不可)
stream.windowAll(...)

理由: KeyBy で複数の並列タスクに分散、スループット向上

5. RocksDB を大規模状態で使用

env.setStateBackend(new RocksDBStateBackend(
    "s3://checkpoint-bucket/rocksdb",
    true  // incremental snapshots
));

理由: テラバイト級の大規模状態を効率的に管理

❌ Don’t: アンチパターン

1. Checkpointing なしで本番運用

// ❌ 危険:Checkpointing が無効
config.disableCheckpointing();

// ✅ 正解
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);

2. Event Time を設定しない

// ❌ Processing Time のみ(遅延イベント対応不可)
stream.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))

// ✅ Event Time + Watermark
stream.assignTimestampsAndWatermarks(strategy)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))

3. 外部システムへのブロッキング I/O

// ❌ 危険:ブロッキング呼び出し(スループット低下)
DataStream<Result> results = stream.map(event -> {
    String productName = databaseLookup(event.getProductId());  // ブロッキング
    return new Result(event, productName);
});

// ✅ 正解:非同期 I/O
DataStream<Result> asyncResults = AsyncDataStream.unorderedWait(
    stream,
    new AsyncDatabaseLookupFunction(),
    10,
    TimeUnit.SECONDS,
    100
);

4. Stateful Processing での Key Distribution の無視

// ❌ 危険:キーが偏る(一部タスクに負荷集中)
stream.keyBy(event -> event.getCountry())  // 数個の国に偏る可能性

// ✅ 正解:キーを分散化
stream.keyBy(event -> Tuple2.of(event.getCountry(), event.getCity()))

5. Savepoint 戦略の欠如

// ❌ 保険なし:障害時に全メトリクス失う
// → Savepoint を定期作成していない

// ✅ 正解:定期的に Savepoint 作成
// cron で定期実行:
// aws kinesisanalyticsv2 create-savepoint --application-name app --savepoint-name sp-$(date +%s)

トラブルシューティング

症状 原因 対策
Application が遅延イベントを無視 Watermark 未設定 assignTimestampsAndWatermarks() を追加
メモリ OOM エラー 状態が大きすぎる RocksDB Backend に切り替え・State TTL 設定
Checkpoint 失敗・Timeout S3 バケットへのアクセス権限不足 IAM ロールに s3:* 権限追加
タスク 1 つに負荷集中 Key Distribution が偏っている Composite Key(複数フィールド)に変更
重複イベント処理 Checkpointing Mode が At-Least-Once EXACTLY_ONCE に変更
ウィンドウ結果が出ない Watermark が Event Time を追い越していない Watermark Strategy を緩和(Duration.ofSeconds() を増加)
Restore 後のスキーマ不整合 State Serializer 変更 Compatible Serializer を実装・Migration Strategy 計画

パフォーマンス最適化

最適化レイヤー別         対策内容              期待効果
─────────────────────────────────────────────────────────
アプリケーション層     • 効率的なアルゴリズム  30-50% 改善
                    • 不要なオブジェクト削減
                    • State Size 最小化

Flink 設定層         • Parallelism 調整     20-40% 改善
                    • Batch Timeout 最適化
                    • Backend 切り替え

インフラ層           • KPU スケーリング      リニアスケール
                    • Checkpoint Interval  5-15% 改善

具体的な最適化

// 1. Parallelism 最適化(KPU数の2-3倍推奨)
env.setParallelism(4);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

// 2. Buffer Timeout による遅延 vs スループットのトレードオフ
env.setBufferTimeout(100);  // ms

// 3. State TTL で不要な状態削除
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<Event> stateDescriptor = 
    new ValueStateDescriptor<>("event", Event.class);
stateDescriptor.enableTimeToLive(ttlConfig);

// 4. Incremental Checkpoint で状態バックアップ高速化
RocksDBStateBackend backend = new RocksDBStateBackend("s3://bucket/", true);
env.setStateBackend(backend);

コスト管理

コスト構成(月額)
═══════════════════════════════════════
実行中の KPU コスト     : $0.11/KPU-時間
停止時の KPU コスト     : $0.011/KPU-時間(オーケストレーション)
Checkpoint ストレージ   : $0.023/GB-月(S3)
データ転送              : $0.02/GB(Kinesis 出力)
───────────────────────────────────────

例1(小規模):1 KPU × 730 時間 × $0.11 = $80/月

例2(中規模):4 KPU × 730 時間 × $0.11 = $321/月

例3(大規模):10 KPU × 730 時間 × $0.11 = $803/月
          + Checkpoint 50GB × $0.023 = $1.15/月
          + Kinesis Output 100GB × $0.02 = $2/月
          ≈ $806/月

コスト削減戦略

  1. Reserved Capacity(将来実装予定)

    • 1 年契約:30-40% 割引
  2. Auto Scaling 最適化

    • 不必要なスケールアップを避ける
    • Parallelism Per KPU を設定
  3. Checkpointing 最適化

    • Incremental Checkpoints を使用(差分のみ保存)
    • TTL で古い状態を削除
  4. 不要なアプリ停止

    • 開発環境は実装後停止
    • スケジュール実行可能なジョブは Lambda に移行

セキュリティ

セキュリティ層         対策                           標準設定
──────────────────────────────────────────────────
認証・認可            IAM ロール・権限管理             ✅
データ暗号化          S3 での State 暗号化             ✅
ネットワーク分離       VPC Endpoint(将来)            🔄
監査ログ              CloudTrail・CloudWatch Logs      ✅
アプリケーション      入力検証・SQL インジェクション対策 👨‍💻

IAM ポリシー例

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "FlinkApplicationAccess",
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
      ],
      "Resource": "arn:aws:kinesis:region:account:stream/*"
    },
    {
      "Sid": "CheckpointStorage",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject"
      ],
      "Resource": "arn:aws:s3:::checkpoint-bucket/*"
    },
    {
      "Sid": "CloudWatchMetrics",
      "Effect": "Allow",
      "Action": [
        "cloudwatch:PutMetricData",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}

2025-2026 最新動向

新機能・改善

  1. Flink 1.19 での Table API/SQL 最適化

    • Batch Mode での処理速度向上(30% 改善)
    • Incremental Join State Management
  2. PyFlink 強化

    • Pure Python(C++ Bridge 依存削減)
    • Pandas UDF サポート拡大
  3. Studio Notebook の AI 支援

    • AI による最適化提案
    • 自動クエリ生成(2026 Q2 予定)
  4. Iceberg/Hudi 統合深化

    • Time Travel Query サポート
    • Schema Evolution の自動処理
  5. CloudWatch Application Signals 統合

    • Flink 特有のメトリクス自動取得
    • SLO 自動設定

学習リソース

公式ドキュメント・チュートリアル

AWS ブログ・記事

外部リソース・OSS

ベンダートレーニング


実装例・チェックリスト

本番環境デプロイチェックリスト

  • [ ] IAM ロール・権限を最小権限で設定
  • [ ] Checkpointing Mode を EXACTLY_ONCE に設定
  • [ ] Checkpoint Interval・Timeout を適切に設定
  • [ ] Watermark Strategy を定義(遅延許容度を明示)
  • [ ] Auto Scaling Policy を設定(Parallelism・KPU)
  • [ ] CloudWatch Alarms を設定(Error Rate・Latency・Checkpoint Failure)
  • [ ] Log Level を INFO 以上(本番は WARN)
  • [ ] S3 Checkpoint Bucket に適切なライフサイクルポリシー
  • [ ] Savepoint を定期作成スケジュール設定
  • [ ] 災害復旧テスト実行
  • [ ] 負荷テスト実施(ピークトラフィック × 1.5 倍)

問題診断フローチャート

アプリケーション遅延?
  ├─ Yes → Lag Monitor で Consumed Position 確認
  │        ├─ Source Lag 大 → ソース側問題(Kinesis・Kafka)
  │        ├─ State Access Lag 大 → RocksDB I/O ボトルネック
  │        └─ Output Lag 大 → Sink 側問題
  │
  └─ No  → 通常運用継続

Checkpoint Failure?
  ├─ Timeout? → S3 Checkpoint Bucket アクセス確認・権限確認
  ├─ Memory OOM? → State Size 確認・RocksDB Cache 調整
  └─ Network Error? → CloudWatch Logs で詳細確認

まとめ

Amazon Managed Service for Apache Flink は、「ストリーミングデータに対するステートフルな複雑な処理を、サーバーレスでスケーラブルに実行するための標準プラットフォーム」 です。

主な価値提案:

  1. インフラ管理の完全排除 - AWS がスケーリング・Checkpointing・HA を全て担当
  2. Exactly-Once 処理保証 - 金融・決済など重複排除が必須なユースケースに対応
  3. 複数言語サポート - Java/Scala/Python で開発可能
  4. リッチなエコシステム - Kinesis・Kafka・S3 等との深い統合
  5. オープンソース互換性 - セルフマネージド Flink との知識・コード再利用可能

適用判断:

  • ✅ ミリ秒レイテンシー必須
  • ✅ 複雑なステートフル処理必要
  • ✅ Exactly-Once 保証必須
  • ✅ AWS 中心のアーキテクチャ
  • ❌ シンプルフィルタリングのみ → Lambda 推奨
  • ❌ 完全なマルチクラウド要件 → Kafka Streams 検討

今後の展開(2026 年):

  • PyFlink の Pure Python 化(パフォーマンス向上)
  • AI による自動最適化提案
  • Iceberg Time Travel Query の完全サポート
  • Reserved Capacity 価格モデル

参考文献

AWS 公式

  1. Amazon Managed Service for Apache Flink User Guide
  2. AWS Kinesis Analytics Pricing
  3. Managed Flink - Getting Started
  1. Apache Flink Documentation
  2. Flink DataStream API
  3. Flink SQL Reference

技術ブログ・記事

  1. AWS Big Data Blog - Streaming Analytics
  2. Confluent - Kafka vs Flink Comparison
  3. Databricks - Spark Streaming vs Flink

書籍

  1. “Stream Processing with Apache Flink” - Fabian Hueske, Vasiliki Kalavri
  2. “Designing Data-Intensive Applications” - Martin Kleppmann

最終更新:2026-04-26 バージョン:v2.0