目次

Amazon EMR v2.0 完全ガイド 2026

Elastic MapReduce: ビッグデータ分析の統合プラットフォーム

Amazon EMR(Elastic MapReduce) は、Apache Spark・Hadoop・Hive・Presto・Flink・HBase などのオープンソース分析フレームワークを AWS 上で実行するフルマネージドクラスタープラットフォーム です。ペタバイト規模のデータ処理・機械学習・ストリーミング分析をスケーラブルに実行し、EC2・EKS・Serverless の 3 つのデプロイオプションで柔軟に対応します。本ドキュメントは EMR の概念・アーキテクチャ・実装・最新動向を網羅的に解説します。

ドキュメントの目的

本ガイドは以下を対象としています。

  • 初心者向け:EMR とは何か、Apache Spark とは何かを学びたい方
  • データエンジニア向け:EMR on EC2 / EKS / Serverless を使い分けたい方
  • SRE/DevOps 向け:EMR クラスターのコスト最適化・運用・セキュリティ
  • アーキテクト向け:Databricks・Cloudera・Synapse Spark Pool との比較判断
  • 意思決定者向け:EMR vs Glue vs Athena の投資判断

目次

  1. 概要と本質
  2. EMR が解決する課題
  3. 主な特徴
  4. アーキテクチャ
  5. デプロイオプション(EC2・EKS・Serverless)
  6. コアコンポーネント
  7. 主要フレームワークと実行エンジン
  8. EMR on EC2:詳細解説
  9. EMR on EKS:Kubernetes 統合
  10. EMR Serverless:インフラレス実行
  11. EMR Studio:統合開発環境
  12. セキュリティと暗号化
  13. ネットワーク統合
  14. コスト管理と最適化
  15. Glue・Athena・Redshift との比較
  16. 類似サービス比較(Databricks・Cloudera・GCP・Azure)
  17. 主要ユースケース
  18. CLI による実装例
  19. SDK による実装例
  20. Infrastructure as Code
  21. ベストプラクティス
  22. トラブルシューティング
  23. 2025-2026 最新動向
  24. 学習リソース
  25. 実装チェックリスト
  26. まとめ

概要と本質

Amazon EMR は、自前で Spark/Hadoop クラスターを管理するのではなく、マネージドな分散処理基盤を提供するサービス です。

初心者向けメモ

  • EMR = マネージド Spark・Hadoop の実行環境
  • インフラ管理(スケーリング・パッチ・アップグレード)を AWS が引き受ける
  • データエンジニアはアプリケーション開発に集中できる
  • ペタバイト規模のデータ処理・複雑な ETL・ML パイプラインに最適

EMR の位置づけ

graph LR
    Data["データソース<br/>S3/RDS/Kinesis"]
    EMR["Amazon EMR<br/>Spark/Hadoop/Hive/Presto"]
    Results["クエリ結果<br/>S3/Redshift"]
    ML["機械学習<br/>SageMaker"]
    Viz["可視化<br/>QuickSight"]
    
    Data -->|ETL| EMR
    EMR -->|変換済みデータ| Results
    EMR -->|特徴量| ML
    Results -->|SSOT| Viz

EMR が解決する課題

課題 従来のアプローチ EMR による解決
Spark クラスター管理 自前でマシン・OS・Java・Spark を構築・運用 マネージド EMR クラスター:セットアップ時間を数時間から数分に短縮
スケーリング 手動でノード追加・YARN 設定・リバランス 自動スケーリング・Instance Fleets で複数インスタンスタイプに対応
コスト最適化 オンデマンド EC2 インスタンスで運用 スポットインスタンス活用で最大 90% コスト削減
パッチ管理 手動で OS・Spark・ライブラリをアップグレード AWS が定期的にセキュリティ更新・最新版対応
高可用性 Hadoop HA・多数のノード管理が必要 マネージドで HA 構成・フェイルオーバー自動対応
ペタバイト規模処理 大量メモリ・ストレージ管理が複雑 数千ノードまでスケール・EMRFS で S3 を シームレス利用
ML パイプライン 別途の ML インフラが必要 MLlib・SageMaker 統合で ML 対応

主な特徴

1. 3 つのデプロイオプション

オプション 説明 利点 欠点
EMR on EC2 EC2 インスタンス上で実行 最高の柔軟性・細かい制御 インフラ管理負担
EMR on EKS Kubernetes Pod で実行 K8s エコシステム統合・マルチテナント対応 学習コスト・EKS 管理
EMR Serverless サーバーレス・自動スケール アイドル時間ゼロ・完全マネージド コスト予測困難・カスタマイズ制限

2. 複数フレームワーク対応

Apache Spark(最も一般的)
    ├── Spark SQL:ANSI SQL 準拠クエリ
    ├── Spark Streaming:リアルタイム処理
    ├── PySpark / Scala / Java / R
    ├── MLlib:分散機械学習
    └── Spark on Ray(新):Python ネイティブ並列実行

Apache Hadoop & YARN
    ├── MapReduce:レガシーバッチ処理
    ├── YARN:リソースマネージャー
    └── HDFS:分散ストレージ

Apache Hive
    ├── HiveQL:SQL ライク DSL
    └── Metastore:スキーマ管理

Apache Presto / Trino
    ├── インタラクティブ SQL
    ├── 複数データソース連携
    └── 低レイテンシ分析

その他
    ├── Apache Flink:ストリーミング処理
    ├── Apache HBase:NoSQL
    ├── Apache Hudi:ACID トランザクション・デルタ機能
    ├── Apache Iceberg v3:ACID・タイムトラベル・スナップショット
    └── Delta Lake:Unity Catalog 連携

3. スポットインスタンスによるコスト最適化

EMR は Task ノードにスポットインスタンスを活用できます:

EC2 料金:$0.0928/時間(m5.xlarge オンデマンド)
スポット料金:$0.0278/時間(同じスペック)
→ 70% コスト削減

スポット中断時:Task ノードのみ終了(Core ノード・計算継続)
→ ジョブは再試行・自動リスケジュール

4. EMRFS:S3 をネイティブストレージとして利用

クラスター終了後もデータ永続
    → S3 は AWS マネージドストレージ・長期保存可能
    
HDFS より高速・柔軟
    → S3 Select:効率的なフィルタリング
    → 複数クラスター間でのデータ共有

階層化ストレージ対応
    → S3 Intelligent-Tiering でコスト最適化

アーキテクチャ

EMR on EC2 アーキテクチャ図

graph TB
    subgraph Cluster["EMR Cluster(EC2)"]
        Master["Master Node(1台)<br/>YARN ResourceManager<br/>HDFS NameNode<br/>Hadoop JobTracker"]
        Core["Core ノード(1台以上)<br/>HDFS DataNode<br/>YARN NodeManager<br/>Spark Executor"]
        Task["Task ノード(オプション)<br/>スポットインスタンス推奨<br/>Spark Executor のみ"]
    end
    
    subgraph Storage["ストレージ"]
        EMRFS["EMRFS<br/>S3 を HDFS のように利用"]
        S3["Amazon S3<br/>永続ストレージ"]
    end
    
    subgraph Framework["フレームワーク"]
        Spark["Apache Spark"]
        Hive["Apache Hive"]
        Presto["Presto/Trino"]
    end
    
    Master --> |YARN リソース管理| Core
    Master --> |YARN リソース管理| Task
    Core --> EMRFS
    Task --> EMRFS
    EMRFS --> S3
    Spark -.-> Framework
    Hive -.-> Framework
    Presto -.-> Framework

クラスター構成パターン

パターン 1:バッチ処理用クラスター

Master(1台・m5.xlarge)
    ↓ YARN ResourceManager
Core(3台・r5.2xlarge)
    ↓ HDFS DataNode / Spark Executor
Task(5-20台・スポット・m5.4xlarge)
    ↓ Spark Executor のみ・中断時は再起動
    
用途:夜間バッチ・日次 ETL・アドホッククエリ
特徴:処理終了後にクラスター削除(使い捨て)

パターン 2:長期実行クラスター

Master(1台・m5.xlarge)
Core(5台・r5.4xlarge)
    ↓ 常時起動・HDFS 永続
Task(2台オンデマンド+スケーリング用スポット)

用途:ストリーミング・24/7 ワークロード・複数ジョブの並行実行
特徴:クラスターを保持・複数チームで共有
リスク:未使用時も課金(スケーリングポリシーで最小化)

パターン 3:EMR Serverless

アプリケーション作成
    → Spark Job 送信
    → Worker 自動起動
    → ジョブ実行
    → 自動終了(数分後)
    
特徴:クラスター管理不要・アイドル時間ゼロ
課金:vCPU-時間 + メモリ-時間(ジョブ実行時間のみ)

デプロイオプション

1. EMR on EC2:最高の制御と柔軟性

特徴

タイプ: 完全管理 EC2 クラスター
初期化: ノード選択・ブートストラップスクリプト・カスタム設定可能
スケーリング: 自動スケーリング・Manual Resizing
コスト: EC2 料金 + EMR 上乗せ料金($0.048/時間・m5.xlarge)
管理: Master ノード への SSH 接続で細かいチューニング可能

ユースケース

  • 大規模バッチ処理(1-10K ノード)
  • カスタム設定が必要(特定 Spark バージョン・ローカルライブラリ)
  • 他の EC2 サービスとの密結合(VPC・IAM ロール・セキュリティグループ)
  • Hadoop エコシステム(Hive Metastore・HBase・複数フレームワーク)

実装例

# EMR on EC2 クラスター作成
aws emr create-cluster \
  --name "spark-etl-prod" \
  --release-label emr-7.11.0 \
  --instance-type m5.xlarge \
  --instance-count 10 \
  --applications Name=Spark Name=Hive Name=Presto \
  --bootstrap-actions Path=s3://my-bucket/bootstrap.sh \
  --log-uri s3://my-bucket/emr-logs/ \
  --use-glue-catalog \
  --enable-glue-data-catalog

# ジョブ投入(Spark)
aws emr add-steps \
  --cluster-id j-xxxxxx \
  --steps Type=Spark,Name="ETL Job",\
    ActionOnFailure=TERMINATE_CLUSTER,\
    Args=[s3://my-bucket/etl.py,--input,s3://data/,--output,s3://output/]

2. EMR on EKS:Kubernetes 統合

特徴

基盤: Amazon EKS(マネージド Kubernetes)
Pod 実行: 一時的な Spark Pod・自動スケーリング
リソース: EKS ノードプール上で実行・他のワークロードと共存
スケーリング: Pod オートスケーラー・Karpenter 対応
コスト: EKS 管理料 + EC2 ノード料金(クラスターレベル)

ユースケース

  • Kubernetes 統合環境への Spark 統合
  • マルチテナント・複数チームが同じ EKS クラスター利用
  • Pod ライフサイクル管理が必要
  • Istio・Prometheus などの K8s 監視スタック統合

実装例

# EMR on EKS 仮想クラスター作成
aws emr-containers create-virtual-cluster \
  --name spark-on-eks \
  --container-provider '{
    "type": "EKS",
    "id": "my-eks-cluster",
    "info": {
      "eksConnectorConfig": {
        "enabled": true
      }
    }
  }'

# Spark ジョブ投入
aws emr-containers start-job-run \
  --virtual-cluster-id 000000000000-00000 \
  --name "etl-job" \
  --execution-role-arn arn:aws:iam::123456789:role/EMRJobRole \
  --job-driver '{
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://my-bucket/etl.py",
      "sparkSubmitParameters": "--conf spark.executor.memory=4g --conf spark.executor.cores=2"
    }
  }'

3. EMR Serverless:インフラレス実行

特徴

構造: 完全サーバーレス・自動スケーリング
初期化時間: 数十秒(クラスター不要)
スケーリング: Worker の自動プロビジョニング・アイドル時自動終了
課金: vCPU-時間 ($0.052624) + メモリ-時間 ($0.0057785/GB)
管理: CLI/API/SDK のみ・コンソール SSH なし

ユースケース

  • 断続的・バースト処理(定期バッチ・イベント駆動)
  • コスト管理が重要(未使用時間ゼロ)
  • インフラ管理を最小化したい
  • 数分以下の短時間ジョブ

実装例

# EMR Serverless アプリケーション作成
aws emr-serverless create-application \
  --name spark-etl-app \
  --release-label emr-7.11.0 \
  --type SPARK

# ジョブ実行
aws emr-serverless start-job-run \
  --application-id 00000000000000000 \
  --execution-role-arn arn:aws:iam::123456789:role/EMRServerlessRole \
  --job-driver '{
    "sparkSubmit": {
      "entryPoint": "s3://my-bucket/etl.py",
      "sparkSubmitParameters": "--conf spark.executor.memory=4g"
    }
  }' \
  --configuration-overrides '{
    "monitoringConfiguration": {
      "s3MonitoringConfiguration": {
        "logUri": "s3://my-bucket/logs/"
      }
    }
  }'

# PySpark Notebook サポート(新機能)
aws emr-serverless create-application \
  --name notebook-app \
  --release-label emr-spark-8.0-preview \
  --type SPARK

コアコンポーネント

1. クラスターマネージャー

YARN(Yet Another Resource Negotiator)
    ├── ResourceManager:リソース割り当て・スケジューリング
    ├── NodeManager:各ノードのリソース監視
    ├── ApplicationMaster:ジョブのライフサイクル管理
    └── 出力:HDFS / S3 に結果保存
    
Spark StandaloneManager(EMR on EKS)
    ├── Kubernetes API:Pod 管理
    ├── Driver Pod:ジョブの親プロセス
    └── Executor Pod:並列タスク実行

2. ストレージレイヤー

EMRFS(EMR File System)
    ├── S3 ネイティブ統合:s3a:// URI スキーム
    ├── 一貫性強化:DynamoDB でメタデータ管理
    ├── 暗号化:S3 KMS / SSE-S3
    └── マルチパートアップロード最適化
    
HDFS(分散ストレージ)
    ├── NameNode(Master):ファイルシステム管理
    ├── DataNode(Core/Task):実データ保存
    └── 用途:高速中間ストレージ・キャッシング
    
ローカルディスク
    ├── SSD / HDD インスタンス
    ├── スピンアップ ストレージ(コスト効率)
    └── スピルオーバー用(メモリ不足時)

3. 主要フレームワーク

Apache Spark 4.0(2026 最新)

# Spark 4.0.1 新機能:VARIANT データ型(JSON ネイティブ)
spark.sql("""
    SELECT 
        data:customer.name AS name,
        data:order.total::DECIMAL(10, 2) AS total
    FROM json_table
    WHERE data:region = 'Tokyo'
""")

# Apache Iceberg v3 統合
df = spark.read.format("iceberg").load("s3://lake/products")
df.filter("price > 1000").write.format("iceberg")\
    .mode("overwrite").save("s3://lake/products_v3")

# Row-level lineage 追跡
spark.conf.set("spark.sql.iceberg.row_lineage.enabled", "true")

Apache Hive 4.0(SQL ウェアハウス)

-- Apache Hive による SQL クエリ
CREATE EXTERNAL TABLE orders (
    order_id BIGINT,
    customer_id BIGINT,
    amount DECIMAL(10, 2),
    created_at TIMESTAMP
)
PARTITIONED BY (region STRING)
STORED AS PARQUET
LOCATION 's3://lake/orders/';

-- Hive on Spark エンジン
SET hive.execution.mode=llap;
SELECT region, SUM(amount) as total
FROM orders
WHERE YEAR(created_at) = 2026
GROUP BY region;

Apache Presto / Trino(インタラクティブ SQL)

-- Trino で複数データソース横断クエリ
SELECT 
    o.order_id,
    c.name,
    o.amount
FROM hive.default.orders o
JOIN postgres.public.customers c
    ON o.customer_id = c.id
WHERE o.created_at > CURRENT_DATE - INTERVAL '7' DAY;

Apache Flink(ストリーミング処理)

// Kinesis からのストリーミング処理
StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> stream = env.addSource(
    new FlinkKinesisConsumer<>("order-stream",
        new SimpleStringSchema(),
        consumerConfig));

stream.map(new MapFunction<String, Order>() {
    public Order map(String value) {
        return parseOrder(value);
    }
})
.filter(o -> o.amount > 1000)
.sinkTo(new S3CommitSink("s3://lake/filtered-orders/"));

env.execute("Streaming ETL");

Apache Hudi・Iceberg・Delta Lake(ACID テーブル)

# Hudi による ACID トランザクション
df.write.format("hudi") \
    .option("hoodie.table.name", "orders") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .option("hoodie.datasource.write.recordkey.field", "order_id") \
    .option("hoodie.datasource.write.partitionpath.field", "region") \
    .save("s3://lake/orders-hudi")

# Iceberg による Time Travel
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("iceberg").getOrCreate()

# 1 時間前のスナップショットを参照
df = spark.read.option("as-of-timestamp", "2026-04-26 10:00:00") \
    .format("iceberg").load("s3://lake/products")

EMR on EC2

クラスター構成の詳細

Master ノード設定:
  インスタンスタイプ: m5.xlarge(4 vCPU, 16 GB)以上推奨
  用途:
    - YARN ResourceManager
    - HDFS NameNode
    - Hive Metastore
    - Spark Driver(スタンドアロン実行時)
  特性: 単一障害点・高可用性には複数マスター(本番環境)
  ストレージ: 100 GB EBS(ログ・メタデータ)

Core ノード設定:
  インスタンスタイプ: r5.2xlarge(8 vCPU, 64 GB)~ r6.4xlarge(16 vCPU, 128 GB)
  用途:
    - HDFS DataNode
    - YARN NodeManager・Executor ホスト
    - HDFS レプリケーション保持
  最小台数: 3 台(HDFS レプリケーション = 3
  特性: クラスター削除時にデータ손실(S3 EMRFS 使用で回避)
  ストレージ: 大容量 EBS + インスタンスストア

Task ノード設定:
  インスタンスタイプ: m5.4xlarge(16 vCPU, 64 GB)~ c6.9xlarge(36 vCPU)
  用途: Spark Executor コンピュート・HDFS レプリケーション不要
  スポット推奨: Task ノード = スポットインスタンス活用で コスト 70-90% 削減
  スケーリング: 自動スケーリングポリシー(CPU・メモリに基づく)
  フェイルオーバー: Task 노드 중단  다른 노드로 재시작

Instance Fleet(インスタンスタイプの組み合わせ)

import boto3

emr = boto3.client('emr')

response = emr.create_cluster(
    Name='instance-fleet-cluster',
    ReleaseLabel='emr-7.11.0',
    Applications=[{'Name': 'Spark'}, {'Name': 'Hive'}],
    Instances={
        'InstanceFleets': [
            {
                'Name': 'Master Fleet',
                'InstanceFleetType': 'MASTER',
                'TargetOnDemandCapacity': 1,
                'InstanceTypeConfigs': [
                    {'InstanceType': 'm5.xlarge'},
                    {'InstanceType': 'm5.2xlarge'}
                ]
            },
            {
                'Name': 'Core Fleet',
                'InstanceFleetType': 'CORE',
                'TargetOnDemandCapacity': 2,
                'TargetSpotCapacity': 8,
                'InstanceTypeConfigs': [
                    {'InstanceType': 'r5.2xlarge', 'WeightedCapacity': '2'},
                    {'InstanceType': 'r5.4xlarge', 'WeightedCapacity': '4'},
                    {'InstanceType': 'r6.2xlarge', 'WeightedCapacity': '2'}
                ]
            },
            {
                'Name': 'Task Fleet',
                'InstanceFleetType': 'TASK',
                'TargetOnDemandCapacity': 0,
                'TargetSpotCapacity': 20,
                'InstanceTypeConfigs': [
                    {'InstanceType': 'm5.4xlarge', 'BidPrice': '0.50'},
                    {'InstanceType': 'm6.4xlarge', 'BidPrice': '0.60'},
                    {'InstanceType': 'c5.4xlarge', 'BidPrice': '0.45'}
                ]
            }
        ]
    }
)

ブートストラップアクション(初期化スクリプト)

#!/bin/bash
# Bootstrap script: カスタムライブラリ・設定の自動適用

# Python パッケージインストール
pip install pandas numpy scikit-learn xgboost

# Spark 設定ファイル追加
cat >> /etc/spark/conf/spark-defaults.conf <<EOF
spark.driver.maxResultSize=4g
spark.sql.shuffle.partitions=200
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.kryoserializer.buffer.max=512m
EOF

# 大規模バッチ用ログレベル設定
echo "log4j.rootCategory=WARN, console" > /etc/spark/conf/log4j.properties

# 監視エージェント起動
systemctl start cloudwatch-agent

ステップ(Job)投入

# Spark SQL ジョブ
aws emr add-steps --cluster-id j-xxxxxx \
  --steps Type=Spark,Name="Data ETL",\
    Args=[--class,com.example.DataETL,\
      s3://my-bucket/etl-job.jar,\
      --input,s3://raw-data/,\
      --output,s3://processed-data/]

# Hive ジョブ
aws emr add-steps --cluster-id j-xxxxxx \
  --steps Type=Hive,Name="Aggregation",\
    HiveScript=s3://my-bucket/query.sql,\
    Args=[-d,INPUT_PATH=s3://data/,-d,OUTPUT_PATH=s3://output/]

# Pig スクリプト
aws emr add-steps --cluster-id j-xxxxxx \
  --steps Type=Pig,Name="Data Transformation",\
    PigScript=s3://my-bucket/transform.pig

# シェルスクリプト
aws emr add-steps --cluster-id j-xxxxxx \
  --steps Type=ShellCommand,Name="Cleanup",\
    Args=[s3://my-bucket/cleanup.sh,s3://output/]

EMR on EKS

仮想クラスター(Virtual Cluster)管理

# 仮想クラスター作成(EKS クラスター上)
aws emr-containers create-virtual-cluster \
  --name "data-science-vpc" \
  --container-provider \
    type=EKS,\
    id=my-eks-cluster,\
    info="{eksConnectorConfig: {enabled: true}}"

# Namespace ベースの RBAC
kubectl create namespace spark-jobs
kubectl label namespace spark-jobs spark-enabled=true

# ジョブロール設定
aws iam create-role --role-name EMRJobExecutionRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {
        "Service": "emr-containers.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }]
  }'

# S3 アクセス権限追加
aws iam put-role-policy --role-name EMRJobExecutionRole \
  --policy-name S3Access --policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Action": ["s3:*"],
      "Resource": "*"
    }]
  }'

Spark ジョブ投入(EMR on EKS)

import boto3

emr_containers = boto3.client('emr-containers')

response = emr_containers.start_job_run(
    virtualClusterId='000000000000-00000',
    name='ml-feature-engineering',
    releaseLabel='emr-7.11.0-latest',
    executionRoleArn='arn:aws:iam::123456789:role/EMRJobExecutionRole',
    jobDriver={
        'sparkSubmitJobDriver': {
            'entryPoint': 's3://my-bucket/feature-engineering.py',
            'sparkSubmitParameters': (
                '--conf spark.executor.memory=8g '
                '--conf spark.executor.cores=4 '
                '--conf spark.driver.memory=2g '
                '--conf spark.sql.adaptive.enabled=true'
            ),
            'entryPointArguments': ['--input', 's3://raw-data/', '--output', 's3://features/']
        }
    },
    configurationOverrides={
        'monitoringConfiguration': {
            'persistentAppUI': 'ENABLED',
            's3MonitoringConfiguration': {
                'logUri': 's3://my-bucket/emr-eks-logs/'
            },
            'cloudWatchMonitoringConfiguration': {
                'logGroupName': '/emr-on-eks/jobs',
                'logStreamNamePrefix': 'feature-eng'
            }
        }
    },
    tags={
        'environment': 'production',
        'team': 'data-science'
    }
)

print(f"Job ID: {response['id']}")

EMR Serverless

アプリケーション管理

import boto3

emr_serverless = boto3.client('emr-serverless')

# Spark アプリケーション作成
app_response = emr_serverless.create_application(
    name='batch-etl-serverless',
    releaseLabel='emr-7.11.0',
    type='SPARK',
    clientToken='unique-request-id-12345',
    tags={
        'environment': 'production',
        'cost-center': 'data-eng'
    }
)

app_id = app_response['applicationId']
print(f"Created application: {app_id}")

# アプリケーション詳細確認
app_details = emr_serverless.get_application(applicationId=app_id)
print(f"Status: {app_details['application']['status']}")

バッチジョブ投入(EMR Serverless)

# ETL ジョブ投入
job_response = emr_serverless.start_job_run(
    applicationId=app_id,
    clientToken='unique-job-token-67890',
    executionRoleArn='arn:aws:iam::123456789:role/EMRServerlessRole',
    jobDriver={
        'sparkSubmit': {
            'entryPoint': 's3://my-bucket/etl-pipeline.py',
            'entryPointArguments': [
                '--date', '2026-04-26',
                '--input-bucket', 's3://raw-data/',
                '--output-bucket', 's3://processed-data/'
            ],
            'sparkSubmitParameters': (
                '--conf spark.executor.memory=4g '
                '--conf spark.executor.cores=2 '
                '--conf spark.driver.memory=2g '
                '--conf spark.dynamicAllocation.enabled=true'
            )
        }
    },
    configurationOverrides={
        'monitoringConfiguration': {
            's3MonitoringConfiguration': {
                'logUri': 's3://my-bucket/serverless-logs/',
                'encryptionKeyArn': 'arn:aws:kms:ap-northeast-1:123456789:key/12345678-1234-1234-1234-123456789012'
            },
            'cloudWatchMonitoringConfiguration': {
                'logGroupName': '/emr-serverless/jobs',
                'logStreamNamePrefix': 'etl-batch'
            }
        },
        'workerTypeSpecifications': {
            'DRIVER': {
                'diskSize': 30
            },
            'EXECUTOR': {
                'diskSize': 100
            }
        }
    },
    tags={
        'pipeline': 'daily-etl',
        'env': 'prod'
    }
)

print(f"Job ID: {job_response['jobRunId']}")

Pre-initialized Capacity(キャッシュ)

# ウォームスタンバイ Worker を常時起動(コールドスタート回避)
emr_serverless.create_application(
    name='low-latency-app',
    releaseLabel='emr-7.11.0',
    type='SPARK',
    initialCapacity={
        'DRIVER': {
            'workerCount': 1,
            'workerConfiguration': {
                'cpu': '2',
                'memory': '8GB',
                'disk': '30GB'
            }
        },
        'EXECUTOR': {
            'workerCount': 10,
            'workerConfiguration': {
                'cpu': '4',
                'memory': '16GB',
                'disk': '100GB'
            }
        }
    }
)

# 常時起動コスト:1 DCU(vCPU)= 24 時間 $0.30
# DRIVER 2 vCPU + EXECUTOR 40 vCPU = 42 vCPU × $0.30 × 30 日 = $378/月

EMR Studio:統合開発環境

# EMR Studio ノートブック
# → Jupyter ベース・Git 統合・複数ユーザーコラボレーション

# PySpark コード例
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

spark = SparkSession.builder.appName("clustering").getOrCreate()

# S3 から Parquet 読み込み
df = spark.read.parquet("s3://lake/customers/")

# 特徴量エンジニアリング
assembler = VectorAssembler(
    inputCols=["age", "salary", "purchases"],
    outputCol="features"
)
features = assembler.transform(df)

# K-Means クラスタリング
kmeans = KMeans(k=5, seed=42)
model = kmeans.fit(features)

# 結果を S3 に保存
model.transform(features).write.parquet("s3://lake/customer-segments/")

セキュリティ

暗号化

転送中の暗号化:
  TLS 1.2+: ノード間・Spark 通信
  Kerberos: クラスター内認証(オプション)
  
保存時の暗号化:
  S3: KMS CMK / SSE-S3
  EBS: AWS KMS キー
  HDFS: Hadoop 暗号化ゾーン

アクセス制御:
  IAM Role: EC2 インスタンス・EMR ジョブに付与
  Security Group: ノード間・外部通信制限
  Lake Formation: 列・行レベルのアクセス制御
  VPC Endpoint: プライベート接続(S3・DynamoDB)

Kerberos 有効化

# Kerberos クラスター作成
aws emr create-cluster \
  --release-label emr-7.11.0 \
  --security-configuration kerberos-config \
  --ec2-key-name my-keypair

# セキュリティ設定
aws emr create-security-configuration \
  --name kerberos-config \
  --security-configuration '{
    "KerberosAttributes": {
      "KdcAdminPassword": "MyKdcPassword",
      "Realm": "AWS.EXAMPLE.COM"
    }
  }'

Lake Formation 統合

# Lake Formation で列・行レベルのアクセス制御
# Athena / EMR / Redshift に統一的に適用

# SQL で権限定義
spark.sql("""
    GRANT SELECT (order_id, customer_id, amount) 
    ON TABLE orders 
    TO ROLE analyst_role;
""")

# PII 列(email・phone)は参照不可
spark.sql("""
    GRANT SELECT (id, name, region) 
    ON TABLE customers 
    TO ROLE analyst_role;
""")

ネットワーク統合

VPC 内配置

推奨構成:
  Master・Core ノード: プライベートサブネット
  NAT Gateway: アウトバウンド通信(インターネット)
  VPC Gateway Endpoint: S3・DynamoDB への低レイテンシアクセス
  Security Group: YARN(8088)・Spark UI(4040)制限
  
VPC Endpoint 設定:
  - S3: ゲートウェイエンドポイント(無料)
  - DynamoDB: ゲートウェイエンドポイント(無料)
  - CloudWatch Logs: インターフェースエンドポイント
  - ECR: Docker イメージ取得時

セキュリティグループ

Master ノード受信ルール:
  - 22/tcp(SSH): 管理者のみ
  - 8088/tcp(YARN Web): VPC 内のみ
  - 9870/tcp(NameNode): VPC 内のみ
  - 4040/tcp(Spark UI): VPC 内のみ

Core/Task ノード受信ルール:
  - 受信:Master / Core ノードからの全トラフィック
  - 受信:VPC 内のクライアント(ポート 1024-65535
  - 送信:すべて許可(S3・ログ出力)

コスト管理

コスト構成

EMR on EC2:
    EC2 インスタンス料金
        Master: m5.xlarge × $0.192/時間
        Core: r5.2xlarge × 3 × $0.504/時間 = $1.512/時間
        Task(スポット): m5.4xlarge × 10 × $0.0928/時間 = $0.928/時間
    小計:$2.632/時間 × 720 時間(月間) = $1,895/月

    EMR 上乗せ料金:$0.048/時間 × 13 インスタンス = $0.624/時間
    月間:$0.624/時間 × 720 = $449/月

    S3 ストレージ:$0.023/GB-月
    CloudWatch Logs:$0.50/GB

月間合計(常時起動):$2,900~$3,200

最適化戦略:
    1. 使い捨てクラスター:処理終了後に削除
    2. スポットインスタンス活用:Task ノード 70-90% 削減
    3. EMR Serverless:短時間ジョブ・バースト処理に限定
    4. Spot Instance Interruption Handling:Task ノード損失時の自動再試行
    5. CloudWatch カスタムメトリクス:効率的なスケーリングルール

コスト削減のベストプラクティス

対策 削減率 効果 実装難度
スポットインスタンス活用 70-90% Task ノードの大部分を置換可
使い捨てクラスター 60% 長期間クラスター保有排除
EMR Serverless 50-80% アイドル時間ゼロ
オンデマンド容量予約 30% 月次予測可能
ダウンタイムスケーリング 40-60% 夜間・休日自動縮小
ローカルキャッシング 20% 中間データ HDFS 優先
Glue ETL への移行 40-50% サーバーレスで管理削減

比較

EMR vs Glue vs Athena

項目 EMR Glue Athena
用途 大規模 ETL・複雑な Spark 処理 中規模 ETL・自動化 Ad hoc SQL 分析
エンジン Spark・Hadoop・Hive・Presto Spark(サーバーレス) Presto/Trino
管理 クラスター構築・運用 サーバーレス・自動スケール サーバーレス・即実行
柔軟性 最高(細かい制御可) 中(カスタム Spark コード) 低(SQL のみ)
スケール ペタバイト対応 テラバイト~ペタバイト テラバイト対応
学習コスト 高(Spark・Hadoop 知識必要) 低(SQL のみ)
コスト 安定運用で安い / 管理負担 中程度 データ量に応じた従量制
データ発見 なし Glue Catalog Glue Catalog 統合
採用判断 複雑な ML パイプライン・カスタマイズ 一般的な ETL・自動化 探索的分析・ダッシュボード

EMR vs Databricks

項目 EMR Databricks
プロバイダー AWS マネージド SaaS・マルチクラウド
基盤 Spark on EC2/EKS 最適化 Spark + SQLとML
Lakehouse 機能 Delta Lake / Iceberg / Hudi Unity Catalog(統一メタストア)
AI/ML 統合 SageMaker 連携 搭載・AutoML 提供
Notebooks EMR Studio Databricks Notebooks(優位)
コスト インフラ管理が必要 SaaS 従量制・高マークアップ
Lock-in AWS エコシステム マルチクラウド対応・脱出容易
採用 AWS 専一企業・コスト重視 マルチクラウド・エンタープライズ

EMR vs Cloudera Data Platform

項目 EMR Cloudera
運用 AWS マネージド オンプレ・ハイブリッド
Hadoop/Spark 最新版対応 エンタープライズ版
セキュリティ IAM・KMS 統合 Ranger・LDAP 統合
Data Governance Lake Formation Hue・Ranger RBAC
Cloud Native クラウド中心 オンプレ中心・クラウド対応
コスト 変動費・スケール有利 ライセンス費用
採用 AWS 環境 オンプレ・規制業界

主要ユースケース

1. リアルタイムストリーミング分析

# Kinesis → EMR(Spark Streaming / Flink)→ S3 / Redshift

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("streaming-etl").getOrCreate()

# Kinesis からストリーミング読み込み
kinesis_df = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", "order-stream") \
    .option("region", "ap-northeast-1") \
    .option("initialPosition", "TRIM_HORIZON") \
    .load()

# マイクロバッチ処理
result = kinesis_df \
    .select(
        from_json(col("data").cast("string"), 
                 "order_id INT, customer_id INT, amount DOUBLE, timestamp TIMESTAMP"
               ).alias("event")
    ) \
    .select("event.*") \
    .filter(col("amount") > 1000) \
    .groupBy(window("timestamp", "1 minute"), "customer_id") \
    .agg(sum("amount").alias("total_amount"))

# S3 に結果書き込み
result.writeStream \
    .format("parquet") \
    .option("path", "s3://lake/high-value-orders/") \
    .option("checkpointLocation", "s3://lake/checkpoints/kinesis-streaming") \
    .start() \
    .awaitTermination()

2. 機械学習特徴量エンジニアリング

# 大規模データから ML モデル用の特徴量生成

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, when, to_date, datediff

spark = SparkSession.builder.appName("feature-engineering").getOrCreate()

# 顧客データ読み込み
customers = spark.read.parquet("s3://lake/customers/")
orders = spark.read.parquet("s3://lake/orders/")
products = spark.read.parquet("s3://lake/products/")

# 特徴量作成:顧客のライフタイムバリュー(LTV)
customer_features = orders \
    .groupBy("customer_id") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("amount").alias("lifetime_value"),
        avg("amount").alias("avg_order_value"),
        max("created_at").alias("last_purchase_date"),
        datediff(current_timestamp(), max("created_at")).alias("days_since_purchase")
    )

# 購買パターン特徴量
product_preferences = orders \
    .join(products, "product_id") \
    .groupBy("customer_id") \
    .agg(
        collect_set("category").alias("categories_purchased"),
        count_distinct("product_id").alias("unique_products")
    )

# 全特徴量を結合
final_features = customer_features \
    .join(product_preferences, "customer_id") \
    .join(customers, "customer_id")

# 標準化
assembler = VectorAssembler(
    inputCols=["total_orders", "lifetime_value", "avg_order_value", "days_since_purchase"],
    outputCol="features"
)
features = assembler.transform(final_features)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features"
)
model = scaler.fit(features)
scaled_features = model.transform(features)

# SageMaker Feature Store に書き込み
scaled_features.write \
    .format("iceberg") \
    .mode("overwrite") \
    .save("s3://feature-store/customer-features/")

3. 多言語データ処理(ETL パイプライン)

# PySpark + Hive + Spark SQL のハイブリッド処理

spark.sql("""
    -- Hive で大規模テーブル初期化
    CREATE TABLE IF NOT EXISTS order_summary
    USING PARQUET
    PARTITIONED BY (region)
    AS
    SELECT 
        order_id,
        customer_id,
        SUM(amount) as total,
        COUNT(*) as item_count,
        YEAR(order_date) as year,
        MONTH(order_date) as month,
        QUARTER(order_date) as quarter,
        region
    FROM raw_orders
    GROUP BY order_id, customer_id, region, YEAR(order_date), MONTH(order_date), QUARTER(order_date)
""")

# PySpark で複雑な変換
from pyspark.sql.window import Window

df = spark.read.table("order_summary")

# 移動平均(Window Function)
window_spec = Window \
    .partitionBy("region") \
    .orderBy("year", "month") \
    .rangeBetween(-3, 0)

result = df.withColumn(
    "moving_avg_3m",
    avg("total").over(window_spec)
)

# Apache Spark 4.0 の新機能:VARIANT(JSON ネイティブ)で複雑なデータ処理
spark.sql("""
    SELECT 
        order_id,
        json_data:customer.name AS customer_name,
        CAST(json_data:metadata.created_at AS TIMESTAMP) AS created_date,
        CAST(json_data:pricing.total::DECIMAL(10,2) AS DECIMAL(10,2)) AS total_price
    FROM json_orders
    WHERE YEAR(CAST(json_data:metadata.created_at AS TIMESTAMP)) = 2026
""")

CLI 実装例

クラスター作成・管理

# 1. 基本的なクラスター作成
CLUSTER_ID=$(aws emr create-cluster \
    --name "data-pipeline-prod" \
    --release-label emr-7.11.0 \
    --instance-type m5.xlarge \
    --instance-count 10 \
    --applications Name=Spark Name=Hive Name=Presto Name=Hadoop \
    --bootstrap-actions Path=s3://my-bucket/bootstrap.sh \
    --log-uri s3://my-bucket/emr-logs/ \
    --use-glue-catalog \
    --enable-glue-data-catalog \
    --tags Key=Environment,Value=production Key=Team,Value=data-eng \
    --query 'ClusterId' \
    --output text)

echo "Cluster created: $CLUSTER_ID"

# 2. クラスター状態確認
aws emr describe-cluster --cluster-id $CLUSTER_ID \
    --query 'Cluster.[Id,Name,Status.State,MasterPublicDNSName]' \
    --output table

# 3. Spark ジョブ投入
aws emr add-steps \
    --cluster-id $CLUSTER_ID \
    --steps Type=Spark,Name="ETL Job",ActionOnFailure=TERMINATE_CLUSTER,\
        Args=[--master,yarn,\
              --deploy-mode,cluster,\
              --executor-memory,4g,\
              --executor-cores,2,\
              --num-executors,8,\
              s3://my-bucket/etl.py,\
              --input,s3://raw-data/,\
              --output,s3://processed/]

# 4. Hive クエリ投入
aws emr add-steps \
    --cluster-id $CLUSTER_ID \
    --steps Type=Hive,Name="Data Aggregation",ActionOnFailure=CONTINUE,\
        HiveScript=s3://my-bucket/queries/aggregation.sql,\
        Args=[-d,INPUT_DATE=2026-04-26]

# 5. クラスター削除
aws emr terminate-clusters --cluster-ids $CLUSTER_ID

自動スケーリング設定

# 自動スケーリングポリシー定義
aws emr create-cluster \
    --name "autoscaling-cluster" \
    --release-label emr-7.11.0 \
    --instance-type m5.xlarge \
    --instance-count 10 \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --configurations '[
        {
            "Classification": "mapred-site",
            "Properties": {
                "mapreduce.job.reduces": "0"
            }
        },
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.dynamicAllocation.enabled": "true"
            }
        }
    ]'

# スケーリングルール(CPU ベース)
aws emr put-auto-scaling-policy \
    --cluster-id j-xxxxxx \
    --instance-group-type TASK \
    --auto-scaling-policy '{
        "Constraints": {
            "MinCapacity": 2,
            "MaxCapacity": 50
        },
        "Rules": [
            {
                "Name": "Scale Out",
                "Description": "CPU 使用率 80% 以上でスケールアウト",
                "Action": {
                    "Market": "SPOT",
                    "SimpleScalingPolicyConfiguration": {
                        "AdjustmentType": "ADD_CAPACITY",
                        "ScalingAdjustment": 5,
                        "CoolDown": 300
                    }
                },
                "Trigger": {
                    "CloudWatchAlarmDefinition": {
                        "MetricName": "CPUUtilization",
                        "Namespace": "AWS/EC2",
                        "Statistic": "AVERAGE",
                        "Unit": "PERCENT",
                        "Dimensions": [{"Name": "InstanceGroup", "Value": "TASK"}],
                        "Period": 300,
                        "EvaluationPeriods": 2,
                        "Threshold": 80,
                        "ComparisonOperator": "GREATER_THAN_OR_EQUAL"
                    }
                }
            },
            {
                "Name": "Scale In",
                "Description": "CPU 使用率 20% 以下でスケールイン",
                "Action": {
                    "Market": "SPOT",
                    "SimpleScalingPolicyConfiguration": {
                        "AdjustmentType": "REMOVE_CAPACITY",
                        "ScalingAdjustment": 5,
                        "CoolDown": 600
                    }
                },
                "Trigger": {
                    "CloudWatchAlarmDefinition": {
                        "MetricName": "CPUUtilization",
                        "Namespace": "AWS/EC2",
                        "Statistic": "AVERAGE",
                        "Unit": "PERCENT",
                        "Period": 300,
                        "EvaluationPeriods": 3,
                        "Threshold": 20,
                        "ComparisonOperator": "LESS_THAN"
                    }
                }
            }
        ]
    }'

SDK 実装例

Python Boto3

import boto3
from datetime import datetime

emr = boto3.client('emr', region_name='ap-northeast-1')

def create_emr_cluster(cluster_name, num_instances=10):
    """EMR クラスター作成"""
    response = emr.create_cluster(
        Name=cluster_name,
        ReleaseLabel='emr-7.11.0',
        Applications=[
            {'Name': 'Spark'},
            {'Name': 'Hive'},
            {'Name': 'Presto'},
            {'Name': 'Hadoop'},
            {'Name': 'Hudi'},
            {'Name': 'Iceberg'}
        ],
        Instances={
            'InstanceGroups': [
                {
                    'Name': 'Master',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 1,
                },
                {
                    'Name': 'Core',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'r5.2xlarge',
                    'InstanceCount': 3,
                },
                {
                    'Name': 'Task',
                    'Market': 'SPOT',
                    'InstanceRole': 'TASK',
                    'InstanceType': 'm5.4xlarge',
                    'InstanceCount': num_instances - 4,
                    'BidPrice': '0.50'
                }
            ],
            'Ec2KeyName': 'my-key-pair',
            'KeepJobFlowAliveWhenNoSteps': False
        },
        LogUri='s3://my-bucket/emr-logs/',
        Tags=[
            {'Key': 'Environment', 'Value': 'production'},
            {'Key': 'CreatedBy', 'Value': 'DataPipeline'},
            {'Key': 'CreatedAt', 'Value': datetime.now().isoformat()}
        ],
        BootstrapActions=[
            {
                'Name': 'Install Custom Libraries',
                'ScriptBootstrapAction': {
                    'Path': 's3://my-bucket/bootstrap.sh'
                }
            }
        ],
        Configurations=[
            {
                'Classification': 'spark-defaults',
                'Properties': {
                    'spark.executor.memory': '4g',
                    'spark.executor.cores': '2',
                    'spark.sql.shuffle.partitions': '200',
                    'spark.sql.adaptive.enabled': 'true',
                    'spark.dynamicAllocation.enabled': 'true'
                }
            },
            {
                'Classification': 'yarn-site',
                'Properties': {
                    'yarn.scheduler.capacity.maximum-am-resource-percent': '0.5'
                }
            }
        ]
    )
    
    return response['JobFlowId']


def submit_spark_job(cluster_id, script_path, args):
    """Spark ジョブ投入"""
    response = emr.add_steps(
        ClusterId=cluster_id,
        Steps=[
            {
                'Name': 'Spark ETL Job',
                'ActionOnFailure': 'TERMINATE_CLUSTER',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': [
                        'spark-submit',
                        '--class', 'com.example.ETLJob',
                        '--deploy-mode', 'cluster',
                        '--executor-memory', '4g',
                        '--executor-cores', '2',
                        '--num-executors', '8',
                        script_path
                    ] + args
                }
            }
        ]
    )
    
    return response['StepIds'][0]


def monitor_cluster(cluster_id):
    """クラスター状態監視"""
    response = emr.describe_cluster(ClusterId=cluster_id)
    cluster = response['Cluster']
    
    print(f"Cluster: {cluster['Name']}")
    print(f"Status: {cluster['Status']['State']}")
    print(f"Master: {cluster.get('MasterPublicDNSName', 'N/A')}")
    print(f"Nodes: {cluster['RequestedInstanceCount']}")
    print(f"Created: {cluster['Status']['Timeline']['CreationDateTime']}")
    
    return cluster['Status']['State']


# 使用例
if __name__ == '__main__':
    cluster_id = create_emr_cluster('production-etl', num_instances=15)
    print(f"Created cluster: {cluster_id}")
    
    # ジョブ投入
    step_id = submit_spark_job(
        cluster_id,
        's3://my-bucket/etl-pipeline.jar',
        ['--input', 's3://raw-data/', '--output', 's3://processed/']
    )
    print(f"Submitted step: {step_id}")
    
    # 状態確認
    status = monitor_cluster(cluster_id)
    print(f"Current status: {status}")

Infrastructure as Code

Terraform 例

# variables.tf
variable "cluster_name" {
  default = "emr-production"
}

variable "instance_count" {
  default = 10
}

variable "spot_price" {
  default = "0.50"
}

# main.tf
resource "aws_emr_cluster" "main" {
  name           = var.cluster_name
  release_label  = "emr-7.11.0"
  log_uri        = "s3://${aws_s3_bucket.logs.id}/emr-logs/"
  
  instance_group {
    instance_role = "MASTER"
    instance_type = "m5.xlarge"
    instance_count = 1
  }
  
  instance_group {
    instance_role = "CORE"
    instance_type = "r5.2xlarge"
    instance_count = 3
  }
  
  instance_group {
    instance_role = "TASK"
    instance_type = "m5.4xlarge"
    instance_count = var.instance_count - 4
    bid_price = var.spot_price
  }
  
  bootstrap_action {
    path = "s3://${aws_s3_bucket.scripts.id}/bootstrap.sh"
    name = "Install Libraries"
  }
  
  applications {
    name = "Spark"
  }
  applications {
    name = "Hive"
  }
  applications {
    name = "Presto"
  }
  applications {
    name = "Hudi"
  }
  applications {
    name = "Iceberg"
  }
  
  service_role = aws_iam_role.emr_service_role.arn
  instance_profile = aws_iam_instance_profile.ec2_instance_profile.arn
  
  configurations = jsonencode([
    {
      Classification = "spark-defaults"
      Properties = {
        "spark.executor.memory" = "4g"
        "spark.executor.cores" = "2"
        "spark.sql.adaptive.enabled" = "true"
        "spark.sql.shuffle.partitions" = "200"
      }
    }
  ])
  
  tags = {
    Environment = "production"
    Team = "data-eng"
  }
}

# IAM Role
resource "aws_iam_role" "emr_service_role" {
  name = "emr-service-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "elasticmapreduce.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "emr_service_policy" {
  role = aws_iam_role.emr_service_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole"
}

CloudFormation 例

AWSTemplateFormatVersion: '2010-09-09'
Description: 'EMR Production Cluster with Spark, Hive, Presto, Hudi, Iceberg'

Parameters:
  InstanceCount:
    Type: Number
    Default: 10
  SpotPrice:
    Type: String
    Default: '0.50'

Resources:
  EMRCluster:
    Type: AWS::EMR::Cluster
    Properties:
      Name: emr-production
      ReleaseLabel: emr-7.11.0
      Applications:
        - Name: Spark
        - Name: Hive
        - Name: Presto
        - Name: Hudi
        - Name: Iceberg
      Instances:
        InstanceGroups:
          - Name: Master
            Market: ON_DEMAND
            InstanceRole: MASTER
            InstanceType: m5.xlarge
            InstanceCount: 1
          - Name: Core
            Market: ON_DEMAND
            InstanceRole: CORE
            InstanceType: r5.2xlarge
            InstanceCount: 3
          - Name: Task
            Market: SPOT
            InstanceRole: TASK
            InstanceType: m5.4xlarge
            InstanceCount: !Sub '${InstanceCount - 4}'
            BidPrice: !Ref SpotPrice
      LogUri: s3://my-bucket/emr-logs/
      ServiceRole: !GetAtt EMRServiceRole.Arn
      JobFlowRole: !GetAtt EC2InstanceProfile.Arn
      Tags:
        - Key: Environment
          Value: production
        - Key: Team
          Value: data-eng

  EMRServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: elasticmapreduce.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole

  EC2InstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      Roles:
        - !Ref EC2Role

  EC2Role:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: ec2.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role

Outputs:
  ClusterId:
    Value: !Ref EMRCluster
    Description: EMR Cluster ID
  ClusterArn:
    Value: !GetAtt EMRCluster.Arn
    Description: EMR Cluster ARN

ベストプラクティス

✅ 推奨事項

項目 実装 効果
スポットインスタンス活用 Task ノード 100% スポット 70-90% コスト削減
使い捨てクラスター KeepJobFlowAliveWhenNoSteps=false 月額コスト 60% 削減
EMRFS(S3)利用 HDFS より S3 優先 永続性・他クラスター間共有
Glue Catalog 統合 UseGlueCatalog=true メタデータ一元化
パーティション設計 日付・リージョンで分割 クエリ高速化 20-100 倍
Spark 最適化設定 spark.sql.adaptive.enabled=true 自動チューニング・効率化
CloudWatch 監視 カスタムメトリクス設定 異常早期検知・自動スケーリング
Security Group 制限 Master SSH は VPN 経由のみ セキュリティ向上
Lake Formation 統合 列・行レベルアクセス制御 ガバナンス統一
Infrastructure as Code Terraform / CloudFormation バージョン管理・再現性

❌ 反パターン

反パターン 理由 改善策
常時起動クラスター 未使用時も課金(数万円/月) 使い捨てか Serverless に変更
オンデマンドインスタンスのみ 高コスト Task ノードをスポット化
HDFS をプライマリストレージ クラスター削除でデータ喪失 EMRFS(S3)優先
クラスターサイジング過大 無駄なリソース・高コスト 自動スケーリング導入
ログ設定なし トラブル発生時デバッグ困難 CloudWatch/S3 ログ設定必須
IAM ロール権限過剰 セキュリティリスク 最小権限の原則(特定 S3 バケット・KMS キー)
Spark デフォルト設定 非効率(パーティション少ない・メモリ不足) 環境に合わせたチューニング
エラーハンドリング不足 タイムアウト・再試行ループ Step ActionOnFailure 設定・Circuit Breaker

トラブルシューティング

症状 原因 解決策
Out of Memory(OOM) Executor メモリ不足 spark.executor.memory 増加 / パーティション数増加
Job が遅い リソース不足 spark.sql.adaptive.enabled=true / パーティション最適化
スポット中断 インスタンス返却 複数インスタンスタイプ指定 / Instance Fleet 使用
YARN リソースリーク Executor の正常終了失敗 spark.dynamicAllocation.enabled=true
Glue Catalog 接続失敗 VPC Endpoint 不足 S3 Gateway Endpoint 確認
暗号化キー拒否 IAM KMS 権限不足 クラスターロールに KMS DecryptDecrypt 権限付与
ノード損失中の処理停止 スポット中断時に再配置不能 Instance Fleet で複数タイプ / Core ノード活用
Spark UI アクセス不可 セキュリティグループ制限 Master ノード VPC 内アクセスのみ / SSH トンネル
データ整合性エラー 部分的な書き込み失敗 ACID テーブル(Iceberg/Hudi)導入

2025-2026 最新動向

1. Apache Spark 4.0 リリース

Spark 4.0.1 の主要機能:
  ✅ ANSI SQL 完全準拠
  ✅ VARIANT データ型(JSON ネイティブ処理)
  ✅ Apache Iceberg v3 サポート(削除ベクトル・行レベルリネージ)
  ✅ Streaming 強化(バッファ管理・遅延メトリクス)
  ✅ Python API 改善(type hints・dataclass サポート)

2. Iceberg v3 の ACID トランザクション

2025 年の大きな進化:
  ✅ Row-level lineage 追跡
  ✅ Deletion Vectors(削除マーク)による効率的な DELETE 操作
  ✅ Time Travel クエリ(スナップショット参照)
  ✅ AWS Lake Formation との統合(列・行レベルアクセス制御)
  ✅ EMR 7.11.0 より標準対応

3. EMR Serverless の進化

PySpark Notebook 対応(新機能):
  ✅ emr-spark-8.0-preview で Spark 4.0 + Notebook 対応
  ✅ Jupyter ライク環境・Git 統合
  ✅ 段階的な追加課金なし(vCPU-時間のみ)

Pre-initialized Capacity 拡充:
  ✅ ウォームスタンバイ Worker(1-10 分コールドスタート回避)
  ✅ 予測可能なコスト(月額固定費用)

4. EMR on EKS の実運用拡大

  • Karpenter 統合(2026):
  • ✅ Pod 自動スケーリング(ASG より効率的)
  • ✅ コスト最適化(混合インスタンスタイプ自動選択)
  • ✅ マルチテナント対応強化

5. AI/ML 統合の深化

SageMaker との統合:
  ✅ Feature Store への直接書き込み
  ✅ Spark ML → SageMaker Autopilot への自動流入
  ✅ EMR ジョブの SageMaker Pipelines 統合

Ray on EMR(新):
  ✅ Python ネイティブ並列実行
  ✅ Reinforcement Learning 対応
  ✅ HyperParameter Tuning 効率化

学習リソース

公式ドキュメント

オンラインコース

  • AWS EMR Deep Dive(Linux Academy)
  • Apache Spark 完全ガイド(Udemy)
  • EMR + Databricks 比較コース(DataCamp)

技術ブログ・記事

OSS・ベンダー資料


実装チェックリスト

フェーズ 1:導入準備

  • [ ] 既存データウェアハウス vs EMR のコスト比較完了
  • [ ] 年間データ処理量・クエリ複雑度を測定
  • [ ] Spark・Hadoop スキルを持つ人材確認
  • [ ] AWS 環境(VPC・IAM・S3)準備完了

フェーズ 2:パイロット実装

  • [ ] テスト用 EMR on EC2 クラスター作成
  • [ ] 既存パイプラインを Spark で実装・性能測定
  • [ ] スポットインスタンス活用でコスト削減確認
  • [ ] Lake Formation・Glue Catalog 統合テスト

フェーズ 3:本番運用

  • [ ] Infrastructure as Code(Terraform/CloudFormation)構築
  • [ ] CloudWatch アラーム・ダッシュボード構築
  • [ ] 自動スケーリングルール定義・テスト
  • [ ] セキュリティ(KMS・VPC・IAM Role)完全設定
  • [ ] バックアップ・ディザスタリカバリー計画

フェーズ 4:最適化・高度な運用

  • [ ] Hudi / Iceberg による ACID トランザクション導入
  • [ ] EMR Serverless へ短時間ジョブ移行
  • [ ] EMR Studio Notebook 運用化
  • [ ] SageMaker との ML パイプライン統合
  • [ ] マルチアカウント環境での統一ガバナンス

まとめ

Amazon EMR は 「Apache Spark/Hadoop のフルマネージドクラスター実行基盤」。EC2(細かい制御)・EKS(Kubernetes 統合)・Serverless(ゼロ管理)の 3 オプションで、ペタバイト規模のデータ処理・複雑な ML パイプイン・リアルタイムストリーミングを実現します。

成功の鍵

  1. スポットインスタンスで 70-90% コスト削減
  2. 使い捨てクラスターで月額コスト 60% 削減
  3. **EMRFS(S3)**で永続性・柔軟性確保
  4. Lake Formation + Glue Catalog でデータガバナンス統一
  5. Infrastructure as Code で再現性・バージョン管理
  6. 自動スケーリング + CloudWatch 監視で運用効率化

Spark 4.0・Iceberg v3・EMR Serverless などの最新機能を活用し、クラウドネイティブなデータ分析基盤を構築してください。


最終更新:2026-04-26 バージョン:v2.0