目次
Amazon EMR v2.0 完全ガイド 2026
Elastic MapReduce: ビッグデータ分析の統合プラットフォーム
Amazon EMR(Elastic MapReduce) は、Apache Spark・Hadoop・Hive・Presto・Flink・HBase などのオープンソース分析フレームワークを AWS 上で実行するフルマネージドクラスタープラットフォーム です。ペタバイト規模のデータ処理・機械学習・ストリーミング分析をスケーラブルに実行し、EC2・EKS・Serverless の 3 つのデプロイオプションで柔軟に対応します。本ドキュメントは EMR の概念・アーキテクチャ・実装・最新動向を網羅的に解説します。
ドキュメントの目的
本ガイドは以下を対象としています。
- 初心者向け:EMR とは何か、Apache Spark とは何かを学びたい方
- データエンジニア向け:EMR on EC2 / EKS / Serverless を使い分けたい方
- SRE/DevOps 向け:EMR クラスターのコスト最適化・運用・セキュリティ
- アーキテクト向け:Databricks・Cloudera・Synapse Spark Pool との比較判断
- 意思決定者向け:EMR vs Glue vs Athena の投資判断
目次
- 概要と本質
- EMR が解決する課題
- 主な特徴
- アーキテクチャ
- デプロイオプション(EC2・EKS・Serverless)
- コアコンポーネント
- 主要フレームワークと実行エンジン
- EMR on EC2:詳細解説
- EMR on EKS:Kubernetes 統合
- EMR Serverless:インフラレス実行
- EMR Studio:統合開発環境
- セキュリティと暗号化
- ネットワーク統合
- コスト管理と最適化
- Glue・Athena・Redshift との比較
- 類似サービス比較(Databricks・Cloudera・GCP・Azure)
- 主要ユースケース
- CLI による実装例
- SDK による実装例
- Infrastructure as Code
- ベストプラクティス
- トラブルシューティング
- 2025-2026 最新動向
- 学習リソース
- 実装チェックリスト
- まとめ
概要と本質
Amazon EMR は、自前で Spark/Hadoop クラスターを管理するのではなく、マネージドな分散処理基盤を提供するサービス です。
初心者向けメモ:
- EMR = マネージド Spark・Hadoop の実行環境
- インフラ管理(スケーリング・パッチ・アップグレード)を AWS が引き受ける
- データエンジニアはアプリケーション開発に集中できる
- ペタバイト規模のデータ処理・複雑な ETL・ML パイプラインに最適
EMR の位置づけ
graph LR
Data["データソース<br/>S3/RDS/Kinesis"]
EMR["Amazon EMR<br/>Spark/Hadoop/Hive/Presto"]
Results["クエリ結果<br/>S3/Redshift"]
ML["機械学習<br/>SageMaker"]
Viz["可視化<br/>QuickSight"]
Data -->|ETL| EMR
EMR -->|変換済みデータ| Results
EMR -->|特徴量| ML
Results -->|SSOT| Viz
EMR が解決する課題
| 課題 | 従来のアプローチ | EMR による解決 |
|---|---|---|
| Spark クラスター管理 | 自前でマシン・OS・Java・Spark を構築・運用 | マネージド EMR クラスター:セットアップ時間を数時間から数分に短縮 |
| スケーリング | 手動でノード追加・YARN 設定・リバランス | 自動スケーリング・Instance Fleets で複数インスタンスタイプに対応 |
| コスト最適化 | オンデマンド EC2 インスタンスで運用 | スポットインスタンス活用で最大 90% コスト削減 |
| パッチ管理 | 手動で OS・Spark・ライブラリをアップグレード | AWS が定期的にセキュリティ更新・最新版対応 |
| 高可用性 | Hadoop HA・多数のノード管理が必要 | マネージドで HA 構成・フェイルオーバー自動対応 |
| ペタバイト規模処理 | 大量メモリ・ストレージ管理が複雑 | 数千ノードまでスケール・EMRFS で S3 を シームレス利用 |
| ML パイプライン | 別途の ML インフラが必要 | MLlib・SageMaker 統合で ML 対応 |
主な特徴
1. 3 つのデプロイオプション
| オプション | 説明 | 利点 | 欠点 |
|---|---|---|---|
| EMR on EC2 | EC2 インスタンス上で実行 | 最高の柔軟性・細かい制御 | インフラ管理負担 |
| EMR on EKS | Kubernetes Pod で実行 | K8s エコシステム統合・マルチテナント対応 | 学習コスト・EKS 管理 |
| EMR Serverless | サーバーレス・自動スケール | アイドル時間ゼロ・完全マネージド | コスト予測困難・カスタマイズ制限 |
2. 複数フレームワーク対応
Apache Spark(最も一般的)
├── Spark SQL:ANSI SQL 準拠クエリ
├── Spark Streaming:リアルタイム処理
├── PySpark / Scala / Java / R
├── MLlib:分散機械学習
└── Spark on Ray(新):Python ネイティブ並列実行
Apache Hadoop & YARN
├── MapReduce:レガシーバッチ処理
├── YARN:リソースマネージャー
└── HDFS:分散ストレージ
Apache Hive
├── HiveQL:SQL ライク DSL
└── Metastore:スキーマ管理
Apache Presto / Trino
├── インタラクティブ SQL
├── 複数データソース連携
└── 低レイテンシ分析
その他
├── Apache Flink:ストリーミング処理
├── Apache HBase:NoSQL
├── Apache Hudi:ACID トランザクション・デルタ機能
├── Apache Iceberg v3:ACID・タイムトラベル・スナップショット
└── Delta Lake:Unity Catalog 連携
3. スポットインスタンスによるコスト最適化
EMR は Task ノードにスポットインスタンスを活用できます:
EC2 料金:$0.0928/時間(m5.xlarge オンデマンド)
スポット料金:$0.0278/時間(同じスペック)
→ 70% コスト削減
スポット中断時:Task ノードのみ終了(Core ノード・計算継続)
→ ジョブは再試行・自動リスケジュール
4. EMRFS:S3 をネイティブストレージとして利用
クラスター終了後もデータ永続
→ S3 は AWS マネージドストレージ・長期保存可能
HDFS より高速・柔軟
→ S3 Select:効率的なフィルタリング
→ 複数クラスター間でのデータ共有
階層化ストレージ対応
→ S3 Intelligent-Tiering でコスト最適化
アーキテクチャ
EMR on EC2 アーキテクチャ図
graph TB
subgraph Cluster["EMR Cluster(EC2)"]
Master["Master Node(1台)<br/>YARN ResourceManager<br/>HDFS NameNode<br/>Hadoop JobTracker"]
Core["Core ノード(1台以上)<br/>HDFS DataNode<br/>YARN NodeManager<br/>Spark Executor"]
Task["Task ノード(オプション)<br/>スポットインスタンス推奨<br/>Spark Executor のみ"]
end
subgraph Storage["ストレージ"]
EMRFS["EMRFS<br/>S3 を HDFS のように利用"]
S3["Amazon S3<br/>永続ストレージ"]
end
subgraph Framework["フレームワーク"]
Spark["Apache Spark"]
Hive["Apache Hive"]
Presto["Presto/Trino"]
end
Master --> |YARN リソース管理| Core
Master --> |YARN リソース管理| Task
Core --> EMRFS
Task --> EMRFS
EMRFS --> S3
Spark -.-> Framework
Hive -.-> Framework
Presto -.-> Framework
クラスター構成パターン
パターン 1:バッチ処理用クラスター
Master(1台・m5.xlarge)
↓ YARN ResourceManager
Core(3台・r5.2xlarge)
↓ HDFS DataNode / Spark Executor
Task(5-20台・スポット・m5.4xlarge)
↓ Spark Executor のみ・中断時は再起動
用途:夜間バッチ・日次 ETL・アドホッククエリ
特徴:処理終了後にクラスター削除(使い捨て)
パターン 2:長期実行クラスター
Master(1台・m5.xlarge)
Core(5台・r5.4xlarge)
↓ 常時起動・HDFS 永続
Task(2台オンデマンド+スケーリング用スポット)
用途:ストリーミング・24/7 ワークロード・複数ジョブの並行実行
特徴:クラスターを保持・複数チームで共有
リスク:未使用時も課金(スケーリングポリシーで最小化)
パターン 3:EMR Serverless
アプリケーション作成
→ Spark Job 送信
→ Worker 自動起動
→ ジョブ実行
→ 自動終了(数分後)
特徴:クラスター管理不要・アイドル時間ゼロ
課金:vCPU-時間 + メモリ-時間(ジョブ実行時間のみ)
デプロイオプション
1. EMR on EC2:最高の制御と柔軟性
特徴
タイプ: 完全管理 EC2 クラスター
初期化: ノード選択・ブートストラップスクリプト・カスタム設定可能
スケーリング: 自動スケーリング・Manual Resizing
コスト: EC2 料金 + EMR 上乗せ料金($0.048/時間・m5.xlarge)
管理: Master ノード への SSH 接続で細かいチューニング可能
ユースケース
- 大規模バッチ処理(1-10K ノード)
- カスタム設定が必要(特定 Spark バージョン・ローカルライブラリ)
- 他の EC2 サービスとの密結合(VPC・IAM ロール・セキュリティグループ)
- Hadoop エコシステム(Hive Metastore・HBase・複数フレームワーク)
実装例
# EMR on EC2 クラスター作成
aws emr create-cluster \
--name "spark-etl-prod" \
--release-label emr-7.11.0 \
--instance-type m5.xlarge \
--instance-count 10 \
--applications Name=Spark Name=Hive Name=Presto \
--bootstrap-actions Path=s3://my-bucket/bootstrap.sh \
--log-uri s3://my-bucket/emr-logs/ \
--use-glue-catalog \
--enable-glue-data-catalog
# ジョブ投入(Spark)
aws emr add-steps \
--cluster-id j-xxxxxx \
--steps Type=Spark,Name="ETL Job",\
ActionOnFailure=TERMINATE_CLUSTER,\
Args=[s3://my-bucket/etl.py,--input,s3://data/,--output,s3://output/]
2. EMR on EKS:Kubernetes 統合
特徴
基盤: Amazon EKS(マネージド Kubernetes)
Pod 実行: 一時的な Spark Pod・自動スケーリング
リソース: EKS ノードプール上で実行・他のワークロードと共存
スケーリング: Pod オートスケーラー・Karpenter 対応
コスト: EKS 管理料 + EC2 ノード料金(クラスターレベル)
ユースケース
- Kubernetes 統合環境への Spark 統合
- マルチテナント・複数チームが同じ EKS クラスター利用
- Pod ライフサイクル管理が必要
- Istio・Prometheus などの K8s 監視スタック統合
実装例
# EMR on EKS 仮想クラスター作成
aws emr-containers create-virtual-cluster \
--name spark-on-eks \
--container-provider '{
"type": "EKS",
"id": "my-eks-cluster",
"info": {
"eksConnectorConfig": {
"enabled": true
}
}
}'
# Spark ジョブ投入
aws emr-containers start-job-run \
--virtual-cluster-id 000000000000-00000 \
--name "etl-job" \
--execution-role-arn arn:aws:iam::123456789:role/EMRJobRole \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://my-bucket/etl.py",
"sparkSubmitParameters": "--conf spark.executor.memory=4g --conf spark.executor.cores=2"
}
}'
3. EMR Serverless:インフラレス実行
特徴
構造: 完全サーバーレス・自動スケーリング
初期化時間: 数十秒(クラスター不要)
スケーリング: Worker の自動プロビジョニング・アイドル時自動終了
課金: vCPU-時間 ($0.052624) + メモリ-時間 ($0.0057785/GB)
管理: CLI/API/SDK のみ・コンソール SSH なし
ユースケース
- 断続的・バースト処理(定期バッチ・イベント駆動)
- コスト管理が重要(未使用時間ゼロ)
- インフラ管理を最小化したい
- 数分以下の短時間ジョブ
実装例
# EMR Serverless アプリケーション作成
aws emr-serverless create-application \
--name spark-etl-app \
--release-label emr-7.11.0 \
--type SPARK
# ジョブ実行
aws emr-serverless start-job-run \
--application-id 00000000000000000 \
--execution-role-arn arn:aws:iam::123456789:role/EMRServerlessRole \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://my-bucket/etl.py",
"sparkSubmitParameters": "--conf spark.executor.memory=4g"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://my-bucket/logs/"
}
}
}'
# PySpark Notebook サポート(新機能)
aws emr-serverless create-application \
--name notebook-app \
--release-label emr-spark-8.0-preview \
--type SPARK
コアコンポーネント
1. クラスターマネージャー
YARN(Yet Another Resource Negotiator)
├── ResourceManager:リソース割り当て・スケジューリング
├── NodeManager:各ノードのリソース監視
├── ApplicationMaster:ジョブのライフサイクル管理
└── 出力:HDFS / S3 に結果保存
Spark StandaloneManager(EMR on EKS)
├── Kubernetes API:Pod 管理
├── Driver Pod:ジョブの親プロセス
└── Executor Pod:並列タスク実行
2. ストレージレイヤー
EMRFS(EMR File System)
├── S3 ネイティブ統合:s3a:// URI スキーム
├── 一貫性強化:DynamoDB でメタデータ管理
├── 暗号化:S3 KMS / SSE-S3
└── マルチパートアップロード最適化
HDFS(分散ストレージ)
├── NameNode(Master):ファイルシステム管理
├── DataNode(Core/Task):実データ保存
└── 用途:高速中間ストレージ・キャッシング
ローカルディスク
├── SSD / HDD インスタンス
├── スピンアップ ストレージ(コスト効率)
└── スピルオーバー用(メモリ不足時)
3. 主要フレームワーク
Apache Spark 4.0(2026 最新)
# Spark 4.0.1 新機能:VARIANT データ型(JSON ネイティブ)
spark.sql("""
SELECT
data:customer.name AS name,
data:order.total::DECIMAL(10, 2) AS total
FROM json_table
WHERE data:region = 'Tokyo'
""")
# Apache Iceberg v3 統合
df = spark.read.format("iceberg").load("s3://lake/products")
df.filter("price > 1000").write.format("iceberg")\
.mode("overwrite").save("s3://lake/products_v3")
# Row-level lineage 追跡
spark.conf.set("spark.sql.iceberg.row_lineage.enabled", "true")
Apache Hive 4.0(SQL ウェアハウス)
-- Apache Hive による SQL クエリ
CREATE EXTERNAL TABLE orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10, 2),
created_at TIMESTAMP
)
PARTITIONED BY (region STRING)
STORED AS PARQUET
LOCATION 's3://lake/orders/';
-- Hive on Spark エンジン
SET hive.execution.mode=llap;
SELECT region, SUM(amount) as total
FROM orders
WHERE YEAR(created_at) = 2026
GROUP BY region;
Apache Presto / Trino(インタラクティブ SQL)
-- Trino で複数データソース横断クエリ
SELECT
o.order_id,
c.name,
o.amount
FROM hive.default.orders o
JOIN postgres.public.customers c
ON o.customer_id = c.id
WHERE o.created_at > CURRENT_DATE - INTERVAL '7' DAY;
Apache Flink(ストリーミング処理)
// Kinesis からのストリーミング処理
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(
new FlinkKinesisConsumer<>("order-stream",
new SimpleStringSchema(),
consumerConfig));
stream.map(new MapFunction<String, Order>() {
public Order map(String value) {
return parseOrder(value);
}
})
.filter(o -> o.amount > 1000)
.sinkTo(new S3CommitSink("s3://lake/filtered-orders/"));
env.execute("Streaming ETL");
Apache Hudi・Iceberg・Delta Lake(ACID テーブル)
# Hudi による ACID トランザクション
df.write.format("hudi") \
.option("hoodie.table.name", "orders") \
.option("hoodie.datasource.write.operation", "upsert") \
.option("hoodie.datasource.write.recordkey.field", "order_id") \
.option("hoodie.datasource.write.partitionpath.field", "region") \
.save("s3://lake/orders-hudi")
# Iceberg による Time Travel
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("iceberg").getOrCreate()
# 1 時間前のスナップショットを参照
df = spark.read.option("as-of-timestamp", "2026-04-26 10:00:00") \
.format("iceberg").load("s3://lake/products")
EMR on EC2
クラスター構成の詳細
Master ノード設定:
インスタンスタイプ: m5.xlarge(4 vCPU, 16 GB)以上推奨
用途:
- YARN ResourceManager
- HDFS NameNode
- Hive Metastore
- Spark Driver(スタンドアロン実行時)
特性: 単一障害点・高可用性には複数マスター(本番環境)
ストレージ: 100 GB EBS(ログ・メタデータ)
Core ノード設定:
インスタンスタイプ: r5.2xlarge(8 vCPU, 64 GB)~ r6.4xlarge(16 vCPU, 128 GB)
用途:
- HDFS DataNode
- YARN NodeManager・Executor ホスト
- HDFS レプリケーション保持
最小台数: 3 台(HDFS レプリケーション = 3)
特性: クラスター削除時にデータ손실(S3 EMRFS 使用で回避)
ストレージ: 大容量 EBS + インスタンスストア
Task ノード設定:
インスタンスタイプ: m5.4xlarge(16 vCPU, 64 GB)~ c6.9xlarge(36 vCPU)
用途: Spark Executor コンピュート・HDFS レプリケーション不要
スポット推奨: Task ノード = スポットインスタンス活用で コスト 70-90% 削減
スケーリング: 自動スケーリングポリシー(CPU・メモリに基づく)
フェイルオーバー: Task 노드 중단 → 다른 노드로 재시작
Instance Fleet(インスタンスタイプの組み合わせ)
import boto3
emr = boto3.client('emr')
response = emr.create_cluster(
Name='instance-fleet-cluster',
ReleaseLabel='emr-7.11.0',
Applications=[{'Name': 'Spark'}, {'Name': 'Hive'}],
Instances={
'InstanceFleets': [
{
'Name': 'Master Fleet',
'InstanceFleetType': 'MASTER',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{'InstanceType': 'm5.xlarge'},
{'InstanceType': 'm5.2xlarge'}
]
},
{
'Name': 'Core Fleet',
'InstanceFleetType': 'CORE',
'TargetOnDemandCapacity': 2,
'TargetSpotCapacity': 8,
'InstanceTypeConfigs': [
{'InstanceType': 'r5.2xlarge', 'WeightedCapacity': '2'},
{'InstanceType': 'r5.4xlarge', 'WeightedCapacity': '4'},
{'InstanceType': 'r6.2xlarge', 'WeightedCapacity': '2'}
]
},
{
'Name': 'Task Fleet',
'InstanceFleetType': 'TASK',
'TargetOnDemandCapacity': 0,
'TargetSpotCapacity': 20,
'InstanceTypeConfigs': [
{'InstanceType': 'm5.4xlarge', 'BidPrice': '0.50'},
{'InstanceType': 'm6.4xlarge', 'BidPrice': '0.60'},
{'InstanceType': 'c5.4xlarge', 'BidPrice': '0.45'}
]
}
]
}
)
ブートストラップアクション(初期化スクリプト)
#!/bin/bash
# Bootstrap script: カスタムライブラリ・設定の自動適用
# Python パッケージインストール
pip install pandas numpy scikit-learn xgboost
# Spark 設定ファイル追加
cat >> /etc/spark/conf/spark-defaults.conf <<EOF
spark.driver.maxResultSize=4g
spark.sql.shuffle.partitions=200
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.kryoserializer.buffer.max=512m
EOF
# 大規模バッチ用ログレベル設定
echo "log4j.rootCategory=WARN, console" > /etc/spark/conf/log4j.properties
# 監視エージェント起動
systemctl start cloudwatch-agent
ステップ(Job)投入
# Spark SQL ジョブ
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=Spark,Name="Data ETL",\
Args=[--class,com.example.DataETL,\
s3://my-bucket/etl-job.jar,\
--input,s3://raw-data/,\
--output,s3://processed-data/]
# Hive ジョブ
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=Hive,Name="Aggregation",\
HiveScript=s3://my-bucket/query.sql,\
Args=[-d,INPUT_PATH=s3://data/,-d,OUTPUT_PATH=s3://output/]
# Pig スクリプト
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=Pig,Name="Data Transformation",\
PigScript=s3://my-bucket/transform.pig
# シェルスクリプト
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=ShellCommand,Name="Cleanup",\
Args=[s3://my-bucket/cleanup.sh,s3://output/]
EMR on EKS
仮想クラスター(Virtual Cluster)管理
# 仮想クラスター作成(EKS クラスター上)
aws emr-containers create-virtual-cluster \
--name "data-science-vpc" \
--container-provider \
type=EKS,\
id=my-eks-cluster,\
info="{eksConnectorConfig: {enabled: true}}"
# Namespace ベースの RBAC
kubectl create namespace spark-jobs
kubectl label namespace spark-jobs spark-enabled=true
# ジョブロール設定
aws iam create-role --role-name EMRJobExecutionRole \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "emr-containers.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}'
# S3 アクセス権限追加
aws iam put-role-policy --role-name EMRJobExecutionRole \
--policy-name S3Access --policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": "*"
}]
}'
Spark ジョブ投入(EMR on EKS)
import boto3
emr_containers = boto3.client('emr-containers')
response = emr_containers.start_job_run(
virtualClusterId='000000000000-00000',
name='ml-feature-engineering',
releaseLabel='emr-7.11.0-latest',
executionRoleArn='arn:aws:iam::123456789:role/EMRJobExecutionRole',
jobDriver={
'sparkSubmitJobDriver': {
'entryPoint': 's3://my-bucket/feature-engineering.py',
'sparkSubmitParameters': (
'--conf spark.executor.memory=8g '
'--conf spark.executor.cores=4 '
'--conf spark.driver.memory=2g '
'--conf spark.sql.adaptive.enabled=true'
),
'entryPointArguments': ['--input', 's3://raw-data/', '--output', 's3://features/']
}
},
configurationOverrides={
'monitoringConfiguration': {
'persistentAppUI': 'ENABLED',
's3MonitoringConfiguration': {
'logUri': 's3://my-bucket/emr-eks-logs/'
},
'cloudWatchMonitoringConfiguration': {
'logGroupName': '/emr-on-eks/jobs',
'logStreamNamePrefix': 'feature-eng'
}
}
},
tags={
'environment': 'production',
'team': 'data-science'
}
)
print(f"Job ID: {response['id']}")
EMR Serverless
アプリケーション管理
import boto3
emr_serverless = boto3.client('emr-serverless')
# Spark アプリケーション作成
app_response = emr_serverless.create_application(
name='batch-etl-serverless',
releaseLabel='emr-7.11.0',
type='SPARK',
clientToken='unique-request-id-12345',
tags={
'environment': 'production',
'cost-center': 'data-eng'
}
)
app_id = app_response['applicationId']
print(f"Created application: {app_id}")
# アプリケーション詳細確認
app_details = emr_serverless.get_application(applicationId=app_id)
print(f"Status: {app_details['application']['status']}")
バッチジョブ投入(EMR Serverless)
# ETL ジョブ投入
job_response = emr_serverless.start_job_run(
applicationId=app_id,
clientToken='unique-job-token-67890',
executionRoleArn='arn:aws:iam::123456789:role/EMRServerlessRole',
jobDriver={
'sparkSubmit': {
'entryPoint': 's3://my-bucket/etl-pipeline.py',
'entryPointArguments': [
'--date', '2026-04-26',
'--input-bucket', 's3://raw-data/',
'--output-bucket', 's3://processed-data/'
],
'sparkSubmitParameters': (
'--conf spark.executor.memory=4g '
'--conf spark.executor.cores=2 '
'--conf spark.driver.memory=2g '
'--conf spark.dynamicAllocation.enabled=true'
)
}
},
configurationOverrides={
'monitoringConfiguration': {
's3MonitoringConfiguration': {
'logUri': 's3://my-bucket/serverless-logs/',
'encryptionKeyArn': 'arn:aws:kms:ap-northeast-1:123456789:key/12345678-1234-1234-1234-123456789012'
},
'cloudWatchMonitoringConfiguration': {
'logGroupName': '/emr-serverless/jobs',
'logStreamNamePrefix': 'etl-batch'
}
},
'workerTypeSpecifications': {
'DRIVER': {
'diskSize': 30
},
'EXECUTOR': {
'diskSize': 100
}
}
},
tags={
'pipeline': 'daily-etl',
'env': 'prod'
}
)
print(f"Job ID: {job_response['jobRunId']}")
Pre-initialized Capacity(キャッシュ)
# ウォームスタンバイ Worker を常時起動(コールドスタート回避)
emr_serverless.create_application(
name='low-latency-app',
releaseLabel='emr-7.11.0',
type='SPARK',
initialCapacity={
'DRIVER': {
'workerCount': 1,
'workerConfiguration': {
'cpu': '2',
'memory': '8GB',
'disk': '30GB'
}
},
'EXECUTOR': {
'workerCount': 10,
'workerConfiguration': {
'cpu': '4',
'memory': '16GB',
'disk': '100GB'
}
}
}
)
# 常時起動コスト:1 DCU(vCPU)= 24 時間 $0.30
# DRIVER 2 vCPU + EXECUTOR 40 vCPU = 42 vCPU × $0.30 × 30 日 = $378/月
EMR Studio:統合開発環境
# EMR Studio ノートブック
# → Jupyter ベース・Git 統合・複数ユーザーコラボレーション
# PySpark コード例
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
spark = SparkSession.builder.appName("clustering").getOrCreate()
# S3 から Parquet 読み込み
df = spark.read.parquet("s3://lake/customers/")
# 特徴量エンジニアリング
assembler = VectorAssembler(
inputCols=["age", "salary", "purchases"],
outputCol="features"
)
features = assembler.transform(df)
# K-Means クラスタリング
kmeans = KMeans(k=5, seed=42)
model = kmeans.fit(features)
# 結果を S3 に保存
model.transform(features).write.parquet("s3://lake/customer-segments/")
セキュリティ
暗号化
転送中の暗号化:
TLS 1.2+: ノード間・Spark 通信
Kerberos: クラスター内認証(オプション)
保存時の暗号化:
S3: KMS CMK / SSE-S3
EBS: AWS KMS キー
HDFS: Hadoop 暗号化ゾーン
アクセス制御:
IAM Role: EC2 インスタンス・EMR ジョブに付与
Security Group: ノード間・外部通信制限
Lake Formation: 列・行レベルのアクセス制御
VPC Endpoint: プライベート接続(S3・DynamoDB)
Kerberos 有効化
# Kerberos クラスター作成
aws emr create-cluster \
--release-label emr-7.11.0 \
--security-configuration kerberos-config \
--ec2-key-name my-keypair
# セキュリティ設定
aws emr create-security-configuration \
--name kerberos-config \
--security-configuration '{
"KerberosAttributes": {
"KdcAdminPassword": "MyKdcPassword",
"Realm": "AWS.EXAMPLE.COM"
}
}'
Lake Formation 統合
# Lake Formation で列・行レベルのアクセス制御
# Athena / EMR / Redshift に統一的に適用
# SQL で権限定義
spark.sql("""
GRANT SELECT (order_id, customer_id, amount)
ON TABLE orders
TO ROLE analyst_role;
""")
# PII 列(email・phone)は参照不可
spark.sql("""
GRANT SELECT (id, name, region)
ON TABLE customers
TO ROLE analyst_role;
""")
ネットワーク統合
VPC 内配置
推奨構成:
Master・Core ノード: プライベートサブネット
NAT Gateway: アウトバウンド通信(インターネット)
VPC Gateway Endpoint: S3・DynamoDB への低レイテンシアクセス
Security Group: YARN(8088)・Spark UI(4040)制限
VPC Endpoint 設定:
- S3: ゲートウェイエンドポイント(無料)
- DynamoDB: ゲートウェイエンドポイント(無料)
- CloudWatch Logs: インターフェースエンドポイント
- ECR: Docker イメージ取得時
セキュリティグループ
Master ノード受信ルール:
- 22/tcp(SSH): 管理者のみ
- 8088/tcp(YARN Web): VPC 内のみ
- 9870/tcp(NameNode): VPC 内のみ
- 4040/tcp(Spark UI): VPC 内のみ
Core/Task ノード受信ルール:
- 受信:Master / Core ノードからの全トラフィック
- 受信:VPC 内のクライアント(ポート 1024-65535)
- 送信:すべて許可(S3・ログ出力)
コスト管理
コスト構成
EMR on EC2:
EC2 インスタンス料金
Master: m5.xlarge × $0.192/時間
Core: r5.2xlarge × 3 × $0.504/時間 = $1.512/時間
Task(スポット): m5.4xlarge × 10 × $0.0928/時間 = $0.928/時間
小計:$2.632/時間 × 720 時間(月間) = $1,895/月
EMR 上乗せ料金:$0.048/時間 × 13 インスタンス = $0.624/時間
月間:$0.624/時間 × 720 = $449/月
S3 ストレージ:$0.023/GB-月
CloudWatch Logs:$0.50/GB
月間合計(常時起動):$2,900~$3,200
最適化戦略:
1. 使い捨てクラスター:処理終了後に削除
2. スポットインスタンス活用:Task ノード 70-90% 削減
3. EMR Serverless:短時間ジョブ・バースト処理に限定
4. Spot Instance Interruption Handling:Task ノード損失時の自動再試行
5. CloudWatch カスタムメトリクス:効率的なスケーリングルール
コスト削減のベストプラクティス
| 対策 | 削減率 | 効果 | 実装難度 |
|---|---|---|---|
| スポットインスタンス活用 | 70-90% | Task ノードの大部分を置換可 | 中 |
| 使い捨てクラスター | 60% | 長期間クラスター保有排除 | 低 |
| EMR Serverless | 50-80% | アイドル時間ゼロ | 低 |
| オンデマンド容量予約 | 30% | 月次予測可能 | 中 |
| ダウンタイムスケーリング | 40-60% | 夜間・休日自動縮小 | 高 |
| ローカルキャッシング | 20% | 中間データ HDFS 優先 | 中 |
| Glue ETL への移行 | 40-50% | サーバーレスで管理削減 | 高 |
比較
EMR vs Glue vs Athena
| 項目 | EMR | Glue | Athena |
|---|---|---|---|
| 用途 | 大規模 ETL・複雑な Spark 処理 | 中規模 ETL・自動化 | Ad hoc SQL 分析 |
| エンジン | Spark・Hadoop・Hive・Presto | Spark(サーバーレス) | Presto/Trino |
| 管理 | クラスター構築・運用 | サーバーレス・自動スケール | サーバーレス・即実行 |
| 柔軟性 | 最高(細かい制御可) | 中(カスタム Spark コード) | 低(SQL のみ) |
| スケール | ペタバイト対応 | テラバイト~ペタバイト | テラバイト対応 |
| 学習コスト | 高(Spark・Hadoop 知識必要) | 中 | 低(SQL のみ) |
| コスト | 安定運用で安い / 管理負担 | 中程度 | データ量に応じた従量制 |
| データ発見 | なし | Glue Catalog | Glue Catalog 統合 |
| 採用判断 | 複雑な ML パイプライン・カスタマイズ | 一般的な ETL・自動化 | 探索的分析・ダッシュボード |
EMR vs Databricks
| 項目 | EMR | Databricks |
|---|---|---|
| プロバイダー | AWS マネージド | SaaS・マルチクラウド |
| 基盤 | Spark on EC2/EKS | 最適化 Spark + SQLとML |
| Lakehouse 機能 | Delta Lake / Iceberg / Hudi | Unity Catalog(統一メタストア) |
| AI/ML 統合 | SageMaker 連携 | 搭載・AutoML 提供 |
| Notebooks | EMR Studio | Databricks Notebooks(優位) |
| コスト | インフラ管理が必要 | SaaS 従量制・高マークアップ |
| Lock-in | AWS エコシステム | マルチクラウド対応・脱出容易 |
| 採用 | AWS 専一企業・コスト重視 | マルチクラウド・エンタープライズ |
EMR vs Cloudera Data Platform
| 項目 | EMR | Cloudera |
|---|---|---|
| 運用 | AWS マネージド | オンプレ・ハイブリッド |
| Hadoop/Spark | 最新版対応 | エンタープライズ版 |
| セキュリティ | IAM・KMS 統合 | Ranger・LDAP 統合 |
| Data Governance | Lake Formation | Hue・Ranger RBAC |
| Cloud Native | クラウド中心 | オンプレ中心・クラウド対応 |
| コスト | 変動費・スケール有利 | ライセンス費用 |
| 採用 | AWS 環境 | オンプレ・規制業界 |
主要ユースケース
1. リアルタイムストリーミング分析
# Kinesis → EMR(Spark Streaming / Flink)→ S3 / Redshift
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("streaming-etl").getOrCreate()
# Kinesis からストリーミング読み込み
kinesis_df = spark \
.readStream \
.format("kinesis") \
.option("streamName", "order-stream") \
.option("region", "ap-northeast-1") \
.option("initialPosition", "TRIM_HORIZON") \
.load()
# マイクロバッチ処理
result = kinesis_df \
.select(
from_json(col("data").cast("string"),
"order_id INT, customer_id INT, amount DOUBLE, timestamp TIMESTAMP"
).alias("event")
) \
.select("event.*") \
.filter(col("amount") > 1000) \
.groupBy(window("timestamp", "1 minute"), "customer_id") \
.agg(sum("amount").alias("total_amount"))
# S3 に結果書き込み
result.writeStream \
.format("parquet") \
.option("path", "s3://lake/high-value-orders/") \
.option("checkpointLocation", "s3://lake/checkpoints/kinesis-streaming") \
.start() \
.awaitTermination()
2. 機械学習特徴量エンジニアリング
# 大規模データから ML モデル用の特徴量生成
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, when, to_date, datediff
spark = SparkSession.builder.appName("feature-engineering").getOrCreate()
# 顧客データ読み込み
customers = spark.read.parquet("s3://lake/customers/")
orders = spark.read.parquet("s3://lake/orders/")
products = spark.read.parquet("s3://lake/products/")
# 特徴量作成:顧客のライフタイムバリュー(LTV)
customer_features = orders \
.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
sum("amount").alias("lifetime_value"),
avg("amount").alias("avg_order_value"),
max("created_at").alias("last_purchase_date"),
datediff(current_timestamp(), max("created_at")).alias("days_since_purchase")
)
# 購買パターン特徴量
product_preferences = orders \
.join(products, "product_id") \
.groupBy("customer_id") \
.agg(
collect_set("category").alias("categories_purchased"),
count_distinct("product_id").alias("unique_products")
)
# 全特徴量を結合
final_features = customer_features \
.join(product_preferences, "customer_id") \
.join(customers, "customer_id")
# 標準化
assembler = VectorAssembler(
inputCols=["total_orders", "lifetime_value", "avg_order_value", "days_since_purchase"],
outputCol="features"
)
features = assembler.transform(final_features)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features"
)
model = scaler.fit(features)
scaled_features = model.transform(features)
# SageMaker Feature Store に書き込み
scaled_features.write \
.format("iceberg") \
.mode("overwrite") \
.save("s3://feature-store/customer-features/")
3. 多言語データ処理(ETL パイプライン)
# PySpark + Hive + Spark SQL のハイブリッド処理
spark.sql("""
-- Hive で大規模テーブル初期化
CREATE TABLE IF NOT EXISTS order_summary
USING PARQUET
PARTITIONED BY (region)
AS
SELECT
order_id,
customer_id,
SUM(amount) as total,
COUNT(*) as item_count,
YEAR(order_date) as year,
MONTH(order_date) as month,
QUARTER(order_date) as quarter,
region
FROM raw_orders
GROUP BY order_id, customer_id, region, YEAR(order_date), MONTH(order_date), QUARTER(order_date)
""")
# PySpark で複雑な変換
from pyspark.sql.window import Window
df = spark.read.table("order_summary")
# 移動平均(Window Function)
window_spec = Window \
.partitionBy("region") \
.orderBy("year", "month") \
.rangeBetween(-3, 0)
result = df.withColumn(
"moving_avg_3m",
avg("total").over(window_spec)
)
# Apache Spark 4.0 の新機能:VARIANT(JSON ネイティブ)で複雑なデータ処理
spark.sql("""
SELECT
order_id,
json_data:customer.name AS customer_name,
CAST(json_data:metadata.created_at AS TIMESTAMP) AS created_date,
CAST(json_data:pricing.total::DECIMAL(10,2) AS DECIMAL(10,2)) AS total_price
FROM json_orders
WHERE YEAR(CAST(json_data:metadata.created_at AS TIMESTAMP)) = 2026
""")
CLI 実装例
クラスター作成・管理
# 1. 基本的なクラスター作成
CLUSTER_ID=$(aws emr create-cluster \
--name "data-pipeline-prod" \
--release-label emr-7.11.0 \
--instance-type m5.xlarge \
--instance-count 10 \
--applications Name=Spark Name=Hive Name=Presto Name=Hadoop \
--bootstrap-actions Path=s3://my-bucket/bootstrap.sh \
--log-uri s3://my-bucket/emr-logs/ \
--use-glue-catalog \
--enable-glue-data-catalog \
--tags Key=Environment,Value=production Key=Team,Value=data-eng \
--query 'ClusterId' \
--output text)
echo "Cluster created: $CLUSTER_ID"
# 2. クラスター状態確認
aws emr describe-cluster --cluster-id $CLUSTER_ID \
--query 'Cluster.[Id,Name,Status.State,MasterPublicDNSName]' \
--output table
# 3. Spark ジョブ投入
aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps Type=Spark,Name="ETL Job",ActionOnFailure=TERMINATE_CLUSTER,\
Args=[--master,yarn,\
--deploy-mode,cluster,\
--executor-memory,4g,\
--executor-cores,2,\
--num-executors,8,\
s3://my-bucket/etl.py,\
--input,s3://raw-data/,\
--output,s3://processed/]
# 4. Hive クエリ投入
aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps Type=Hive,Name="Data Aggregation",ActionOnFailure=CONTINUE,\
HiveScript=s3://my-bucket/queries/aggregation.sql,\
Args=[-d,INPUT_DATE=2026-04-26]
# 5. クラスター削除
aws emr terminate-clusters --cluster-ids $CLUSTER_ID
自動スケーリング設定
# 自動スケーリングポリシー定義
aws emr create-cluster \
--name "autoscaling-cluster" \
--release-label emr-7.11.0 \
--instance-type m5.xlarge \
--instance-count 10 \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--configurations '[
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.job.reduces": "0"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true"
}
}
]'
# スケーリングルール(CPU ベース)
aws emr put-auto-scaling-policy \
--cluster-id j-xxxxxx \
--instance-group-type TASK \
--auto-scaling-policy '{
"Constraints": {
"MinCapacity": 2,
"MaxCapacity": 50
},
"Rules": [
{
"Name": "Scale Out",
"Description": "CPU 使用率 80% 以上でスケールアウト",
"Action": {
"Market": "SPOT",
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "ADD_CAPACITY",
"ScalingAdjustment": 5,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Namespace": "AWS/EC2",
"Statistic": "AVERAGE",
"Unit": "PERCENT",
"Dimensions": [{"Name": "InstanceGroup", "Value": "TASK"}],
"Period": 300,
"EvaluationPeriods": 2,
"Threshold": 80,
"ComparisonOperator": "GREATER_THAN_OR_EQUAL"
}
}
},
{
"Name": "Scale In",
"Description": "CPU 使用率 20% 以下でスケールイン",
"Action": {
"Market": "SPOT",
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "REMOVE_CAPACITY",
"ScalingAdjustment": 5,
"CoolDown": 600
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Namespace": "AWS/EC2",
"Statistic": "AVERAGE",
"Unit": "PERCENT",
"Period": 300,
"EvaluationPeriods": 3,
"Threshold": 20,
"ComparisonOperator": "LESS_THAN"
}
}
}
]
}'
SDK 実装例
Python Boto3
import boto3
from datetime import datetime
emr = boto3.client('emr', region_name='ap-northeast-1')
def create_emr_cluster(cluster_name, num_instances=10):
"""EMR クラスター作成"""
response = emr.create_cluster(
Name=cluster_name,
ReleaseLabel='emr-7.11.0',
Applications=[
{'Name': 'Spark'},
{'Name': 'Hive'},
{'Name': 'Presto'},
{'Name': 'Hadoop'},
{'Name': 'Hudi'},
{'Name': 'Iceberg'}
],
Instances={
'InstanceGroups': [
{
'Name': 'Master',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},
{
'Name': 'Core',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'r5.2xlarge',
'InstanceCount': 3,
},
{
'Name': 'Task',
'Market': 'SPOT',
'InstanceRole': 'TASK',
'InstanceType': 'm5.4xlarge',
'InstanceCount': num_instances - 4,
'BidPrice': '0.50'
}
],
'Ec2KeyName': 'my-key-pair',
'KeepJobFlowAliveWhenNoSteps': False
},
LogUri='s3://my-bucket/emr-logs/',
Tags=[
{'Key': 'Environment', 'Value': 'production'},
{'Key': 'CreatedBy', 'Value': 'DataPipeline'},
{'Key': 'CreatedAt', 'Value': datetime.now().isoformat()}
],
BootstrapActions=[
{
'Name': 'Install Custom Libraries',
'ScriptBootstrapAction': {
'Path': 's3://my-bucket/bootstrap.sh'
}
}
],
Configurations=[
{
'Classification': 'spark-defaults',
'Properties': {
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
'spark.dynamicAllocation.enabled': 'true'
}
},
{
'Classification': 'yarn-site',
'Properties': {
'yarn.scheduler.capacity.maximum-am-resource-percent': '0.5'
}
}
]
)
return response['JobFlowId']
def submit_spark_job(cluster_id, script_path, args):
"""Spark ジョブ投入"""
response = emr.add_steps(
ClusterId=cluster_id,
Steps=[
{
'Name': 'Spark ETL Job',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--class', 'com.example.ETLJob',
'--deploy-mode', 'cluster',
'--executor-memory', '4g',
'--executor-cores', '2',
'--num-executors', '8',
script_path
] + args
}
}
]
)
return response['StepIds'][0]
def monitor_cluster(cluster_id):
"""クラスター状態監視"""
response = emr.describe_cluster(ClusterId=cluster_id)
cluster = response['Cluster']
print(f"Cluster: {cluster['Name']}")
print(f"Status: {cluster['Status']['State']}")
print(f"Master: {cluster.get('MasterPublicDNSName', 'N/A')}")
print(f"Nodes: {cluster['RequestedInstanceCount']}")
print(f"Created: {cluster['Status']['Timeline']['CreationDateTime']}")
return cluster['Status']['State']
# 使用例
if __name__ == '__main__':
cluster_id = create_emr_cluster('production-etl', num_instances=15)
print(f"Created cluster: {cluster_id}")
# ジョブ投入
step_id = submit_spark_job(
cluster_id,
's3://my-bucket/etl-pipeline.jar',
['--input', 's3://raw-data/', '--output', 's3://processed/']
)
print(f"Submitted step: {step_id}")
# 状態確認
status = monitor_cluster(cluster_id)
print(f"Current status: {status}")
Infrastructure as Code
Terraform 例
# variables.tf
variable "cluster_name" {
default = "emr-production"
}
variable "instance_count" {
default = 10
}
variable "spot_price" {
default = "0.50"
}
# main.tf
resource "aws_emr_cluster" "main" {
name = var.cluster_name
release_label = "emr-7.11.0"
log_uri = "s3://${aws_s3_bucket.logs.id}/emr-logs/"
instance_group {
instance_role = "MASTER"
instance_type = "m5.xlarge"
instance_count = 1
}
instance_group {
instance_role = "CORE"
instance_type = "r5.2xlarge"
instance_count = 3
}
instance_group {
instance_role = "TASK"
instance_type = "m5.4xlarge"
instance_count = var.instance_count - 4
bid_price = var.spot_price
}
bootstrap_action {
path = "s3://${aws_s3_bucket.scripts.id}/bootstrap.sh"
name = "Install Libraries"
}
applications {
name = "Spark"
}
applications {
name = "Hive"
}
applications {
name = "Presto"
}
applications {
name = "Hudi"
}
applications {
name = "Iceberg"
}
service_role = aws_iam_role.emr_service_role.arn
instance_profile = aws_iam_instance_profile.ec2_instance_profile.arn
configurations = jsonencode([
{
Classification = "spark-defaults"
Properties = {
"spark.executor.memory" = "4g"
"spark.executor.cores" = "2"
"spark.sql.adaptive.enabled" = "true"
"spark.sql.shuffle.partitions" = "200"
}
}
])
tags = {
Environment = "production"
Team = "data-eng"
}
}
# IAM Role
resource "aws_iam_role" "emr_service_role" {
name = "emr-service-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "elasticmapreduce.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "emr_service_policy" {
role = aws_iam_role.emr_service_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole"
}
CloudFormation 例
AWSTemplateFormatVersion: '2010-09-09'
Description: 'EMR Production Cluster with Spark, Hive, Presto, Hudi, Iceberg'
Parameters:
InstanceCount:
Type: Number
Default: 10
SpotPrice:
Type: String
Default: '0.50'
Resources:
EMRCluster:
Type: AWS::EMR::Cluster
Properties:
Name: emr-production
ReleaseLabel: emr-7.11.0
Applications:
- Name: Spark
- Name: Hive
- Name: Presto
- Name: Hudi
- Name: Iceberg
Instances:
InstanceGroups:
- Name: Master
Market: ON_DEMAND
InstanceRole: MASTER
InstanceType: m5.xlarge
InstanceCount: 1
- Name: Core
Market: ON_DEMAND
InstanceRole: CORE
InstanceType: r5.2xlarge
InstanceCount: 3
- Name: Task
Market: SPOT
InstanceRole: TASK
InstanceType: m5.4xlarge
InstanceCount: !Sub '${InstanceCount - 4}'
BidPrice: !Ref SpotPrice
LogUri: s3://my-bucket/emr-logs/
ServiceRole: !GetAtt EMRServiceRole.Arn
JobFlowRole: !GetAtt EC2InstanceProfile.Arn
Tags:
- Key: Environment
Value: production
- Key: Team
Value: data-eng
EMRServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: elasticmapreduce.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole
EC2InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
Roles:
- !Ref EC2Role
EC2Role:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: ec2.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role
Outputs:
ClusterId:
Value: !Ref EMRCluster
Description: EMR Cluster ID
ClusterArn:
Value: !GetAtt EMRCluster.Arn
Description: EMR Cluster ARN
ベストプラクティス
✅ 推奨事項
| 項目 | 実装 | 効果 |
|---|---|---|
| スポットインスタンス活用 | Task ノード 100% スポット | 70-90% コスト削減 |
| 使い捨てクラスター | KeepJobFlowAliveWhenNoSteps=false |
月額コスト 60% 削減 |
| EMRFS(S3)利用 | HDFS より S3 優先 | 永続性・他クラスター間共有 |
| Glue Catalog 統合 | UseGlueCatalog=true |
メタデータ一元化 |
| パーティション設計 | 日付・リージョンで分割 | クエリ高速化 20-100 倍 |
| Spark 最適化設定 | spark.sql.adaptive.enabled=true |
自動チューニング・効率化 |
| CloudWatch 監視 | カスタムメトリクス設定 | 異常早期検知・自動スケーリング |
| Security Group 制限 | Master SSH は VPN 経由のみ | セキュリティ向上 |
| Lake Formation 統合 | 列・行レベルアクセス制御 | ガバナンス統一 |
| Infrastructure as Code | Terraform / CloudFormation | バージョン管理・再現性 |
❌ 反パターン
| 反パターン | 理由 | 改善策 |
|---|---|---|
| 常時起動クラスター | 未使用時も課金(数万円/月) | 使い捨てか Serverless に変更 |
| オンデマンドインスタンスのみ | 高コスト | Task ノードをスポット化 |
| HDFS をプライマリストレージ | クラスター削除でデータ喪失 | EMRFS(S3)優先 |
| クラスターサイジング過大 | 無駄なリソース・高コスト | 自動スケーリング導入 |
| ログ設定なし | トラブル発生時デバッグ困難 | CloudWatch/S3 ログ設定必須 |
| IAM ロール権限過剰 | セキュリティリスク | 最小権限の原則(特定 S3 バケット・KMS キー) |
| Spark デフォルト設定 | 非効率(パーティション少ない・メモリ不足) | 環境に合わせたチューニング |
| エラーハンドリング不足 | タイムアウト・再試行ループ | Step ActionOnFailure 設定・Circuit Breaker |
トラブルシューティング
| 症状 | 原因 | 解決策 |
|---|---|---|
| Out of Memory(OOM) | Executor メモリ不足 | spark.executor.memory 増加 / パーティション数増加 |
| Job が遅い | リソース不足 | spark.sql.adaptive.enabled=true / パーティション最適化 |
| スポット中断 | インスタンス返却 | 複数インスタンスタイプ指定 / Instance Fleet 使用 |
| YARN リソースリーク | Executor の正常終了失敗 | spark.dynamicAllocation.enabled=true |
| Glue Catalog 接続失敗 | VPC Endpoint 不足 | S3 Gateway Endpoint 確認 |
| 暗号化キー拒否 | IAM KMS 権限不足 | クラスターロールに KMS DecryptDecrypt 権限付与 |
| ノード損失中の処理停止 | スポット中断時に再配置不能 | Instance Fleet で複数タイプ / Core ノード活用 |
| Spark UI アクセス不可 | セキュリティグループ制限 | Master ノード VPC 内アクセスのみ / SSH トンネル |
| データ整合性エラー | 部分的な書き込み失敗 | ACID テーブル(Iceberg/Hudi)導入 |
2025-2026 最新動向
1. Apache Spark 4.0 リリース
Spark 4.0.1 の主要機能:
✅ ANSI SQL 完全準拠
✅ VARIANT データ型(JSON ネイティブ処理)
✅ Apache Iceberg v3 サポート(削除ベクトル・行レベルリネージ)
✅ Streaming 強化(バッファ管理・遅延メトリクス)
✅ Python API 改善(type hints・dataclass サポート)
2. Iceberg v3 の ACID トランザクション
2025 年の大きな進化:
✅ Row-level lineage 追跡
✅ Deletion Vectors(削除マーク)による効率的な DELETE 操作
✅ Time Travel クエリ(スナップショット参照)
✅ AWS Lake Formation との統合(列・行レベルアクセス制御)
✅ EMR 7.11.0 より標準対応
3. EMR Serverless の進化
PySpark Notebook 対応(新機能):
✅ emr-spark-8.0-preview で Spark 4.0 + Notebook 対応
✅ Jupyter ライク環境・Git 統合
✅ 段階的な追加課金なし(vCPU-時間のみ)
Pre-initialized Capacity 拡充:
✅ ウォームスタンバイ Worker(1-10 分コールドスタート回避)
✅ 予測可能なコスト(月額固定費用)
4. EMR on EKS の実運用拡大
- Karpenter 統合(2026):
- ✅ Pod 自動スケーリング(ASG より効率的)
- ✅ コスト最適化(混合インスタンスタイプ自動選択)
- ✅ マルチテナント対応強化
5. AI/ML 統合の深化
SageMaker との統合:
✅ Feature Store への直接書き込み
✅ Spark ML → SageMaker Autopilot への自動流入
✅ EMR ジョブの SageMaker Pipelines 統合
Ray on EMR(新):
✅ Python ネイティブ並列実行
✅ Reinforcement Learning 対応
✅ HyperParameter Tuning 効率化
学習リソース
公式ドキュメント
オンラインコース
- AWS EMR Deep Dive(Linux Academy)
- Apache Spark 完全ガイド(Udemy)
- EMR + Databricks 比較コース(DataCamp)
技術ブログ・記事
OSS・ベンダー資料
- Apache Spark GitHub
- Apache Iceberg
- Apache Hudi
- Presto / Trino
- Databricks Documentation
- Cloudera Data Platform
実装チェックリスト
フェーズ 1:導入準備
- [ ] 既存データウェアハウス vs EMR のコスト比較完了
- [ ] 年間データ処理量・クエリ複雑度を測定
- [ ] Spark・Hadoop スキルを持つ人材確認
- [ ] AWS 環境(VPC・IAM・S3)準備完了
フェーズ 2:パイロット実装
- [ ] テスト用 EMR on EC2 クラスター作成
- [ ] 既存パイプラインを Spark で実装・性能測定
- [ ] スポットインスタンス活用でコスト削減確認
- [ ] Lake Formation・Glue Catalog 統合テスト
フェーズ 3:本番運用
- [ ] Infrastructure as Code(Terraform/CloudFormation)構築
- [ ] CloudWatch アラーム・ダッシュボード構築
- [ ] 自動スケーリングルール定義・テスト
- [ ] セキュリティ(KMS・VPC・IAM Role)完全設定
- [ ] バックアップ・ディザスタリカバリー計画
フェーズ 4:最適化・高度な運用
- [ ] Hudi / Iceberg による ACID トランザクション導入
- [ ] EMR Serverless へ短時間ジョブ移行
- [ ] EMR Studio Notebook 運用化
- [ ] SageMaker との ML パイプライン統合
- [ ] マルチアカウント環境での統一ガバナンス
まとめ
Amazon EMR は 「Apache Spark/Hadoop のフルマネージドクラスター実行基盤」。EC2(細かい制御)・EKS(Kubernetes 統合)・Serverless(ゼロ管理)の 3 オプションで、ペタバイト規模のデータ処理・複雑な ML パイプイン・リアルタイムストリーミングを実現します。
成功の鍵:
- スポットインスタンスで 70-90% コスト削減
- 使い捨てクラスターで月額コスト 60% 削減
- **EMRFS(S3)**で永続性・柔軟性確保
- Lake Formation + Glue Catalog でデータガバナンス統一
- Infrastructure as Code で再現性・バージョン管理
- 自動スケーリング + CloudWatch 監視で運用効率化
Spark 4.0・Iceberg v3・EMR Serverless などの最新機能を活用し、クラウドネイティブなデータ分析基盤を構築してください。
最終更新:2026-04-26 バージョン:v2.0