目次
- 初心者から実務者向けの包括的解説
- 概要
- MWAA が解決する課題
- 主な特徴
- アーキテクチャ
- コアコンポーネント
- Environment・Instance Class・ワーカースケーリング
- DAG の開発・デプロイメント
- S3 統合によるコード管理
- AWS サービス統合
- Airflow Connections・Secrets Manager 統合
- 主要ユースケース
- CLI・SDK・IaC による操作例
- CloudWatch Logs・X-Ray・メトリクス
- Private Webserver・IAM 認証・セキュリティ
- VPC 構成・Network Security
- 類似サービス比較
- ベストプラクティス
- トラブルシューティング表
- 2025-2026 最新動向
- 学習リソース
- 実装例・チェックリスト
- まとめ
Amazon MWAA(Managed Workflows for Apache Airflow)v2.0 完全ガイド
初心者から実務者向けの包括的解説
Amazon Managed Workflows for Apache Airflow(MWAA) は、Apache Airflow をフルマネージドのサーバーレスサービスとして提供し、データパイプライン・ETL ジョブ・機械学習ワークフローを Python ベースの DAG で自動化・オーケストレーションできます。インフラ管理・スケーリング・パッチ適用を AWS が完全に担当し、データエンジニアリングチームは DAG 開発に専念できます。本ガイドは、MWAA の概念・アーキテクチャ・実装・運用・最新動向を体系的に解説します。
ドキュメントの目的
本ガイドは以下を対象としています。
- 初心者向け:Airflow とは何か、MWAA が解決する課題を学びたい方
- データエンジニア向け:DAG 開発・S3 デプロイ・AWS サービス統合を実装したい方
- DevOps/SRE 向け:Environment 構成・Auto Scaling・監視・ロギングを運用したい方
- アーキテクト向け:Step Functions・Glue・EventBridge との使い分け・ベストプラクティス
- 意思決定者向け:Astronomer・セルフホスト Airflow との投資判断
2025-2026 年の MWAA エコシステム
- Apache Airflow 2.8 / 2.9 サポート:最新エンジンで Deferrable Task・Dynamic Task Mapping 対応
- Private Webserver モード:VPC 内からのみアクセス可能な Private Webserver
- IAM Authentication:IAM Roles for Service Accounts(IRSA)による細粒度認証
- CloudWatch Logs 統合強化:スケジューラー・ワーカー・ウェブサーバーのログを一元管理
- 環境クラス拡張:mw1.xlarge・mw1.2xlarge で大規模 DAG 対応
- プラグイン・依存関係の自動検出:requirements.txt 変更時の自動再デプロイ
- Cross-Account DAG デプロイ:AWS CodePipeline との統合で CI/CD 自動化
目次
- 概要
- MWAA が解決する課題
- 主な特徴
- アーキテクチャ
- コアコンポーネント
- Environment・Instance Class・ワーカースケーリング
- DAG の開発・デプロイメント
- S3 統合によるコード管理
- AWS サービス統合(Glue・Redshift・SageMaker・Lambda)
- Airflow Connections・Secrets Manager 統合
- 主要ユースケース(12+)
- CLI・SDK・IaC による操作例
- CloudWatch Logs・X-Ray・メトリクス
- Private Webserver・IAM 認証・セキュリティ
- VPC 構成・Network Security
- 類似サービス比較(Step Functions・Astronomer・Google Cloud Composer・Prefect・Dagster)
- ベストプラクティス
- トラブルシューティング表
- 2025-2026 最新動向
- 学習リソース
- 実装例・チェックリスト
- まとめ
- 参考文献
概要
初心者向けメモ:MWAA は「Apache Airflow をマネージドサービス化したもの」です。Airflow は複数のステップを持つワークフロー(DAG = Directed Acyclic Graph)を定義・スケジュール・監視するオープンソースツール。MWAA を使えば、Airflow の EC2 クラスター・データベース・スケーリング管理が不要になり、Python コードで DAG を書いて S3 に置くだけで実行できます。Glue ETL・Redshift・SageMaker などの AWS サービスと統合し、データパイプラインの自動化に特化しています。
MWAA の位置づけ:
graph LR
DataSource["データソース<br/>S3/RDS/API"]
MWAA["Amazon MWAA<br/>DAG Orchestration"]
Tasks["並列タスク処理<br/>Glue/Redshift/ECS"]
Output["出力・分析<br/>S3/Redshift/Athena"]
DataSource -->|監視・トリガー| MWAA
MWAA -->|実行| Tasks
Tasks -->|結果格納| Output
Monitor["CloudWatch Logs<br/>Metrics/Alarms"]
MWAA -.->|ログ・メトリクス| Monitor
MWAA が解決する課題
課題 1: Airflow インフラ管理の複雑性
状況:セルフホスト Airflow でメタデータ DB(PostgreSQL)・スケジューラー・ワーカー・Web サーバーを EC2/Kubernetes で運用。スケーリング・パッチ適用・HA 設定が手動。
MWAA による解決:
- AWS が Environment・Instance Class・Worker Scaling を自動管理
- Aurora PostgreSQL メタデータ DB が MWAA 専用(マネージド)
- マルチ AZ HA が標準装備
- 運用工数を 80% 削減
課題 2: Airflow DAG の依存関係・リトライ・監視
状況:複数の Glue ETL → Redshift COPY → SageMaker 学習 → データ検証というパイプラインで、任意のステップが失敗時に全体が停止。リトライ・スキップ・アラートを手動管理。
MWAA による解決:
- DAG で依存関係を宣言的に定義
- 自動リトライ・タイムアウト・リトライ間隔を設定可能
- CloudWatch Logs でスケジューラー・ワーカー・タスクログを一元管理
- Slack・SNS・EventBridge 連携でアラート自動化
課題 3: 開発環境・本番環境の一貫性
状況:ローカル Airflow で DAG テスト後、本番に deploy すると scheduler timeout や plugin エラーが発生。バージョン・dependency が環境ごとに異なる。
MWAA による解決:
- Apache Airflow バージョンを 2.0 / 2.4 / 2.8 等で明示的に指定
- requirements.txt を S3 に置いて pip install 自動管理
- plugins.zip をアップロードして全ワーカーで共有
- 本番前に mwaa test コマンドで DAG syntax check
主な特徴
| 特徴 | 説明 |
|---|---|
| フルマネージド | インフラ・OS パッチ・Airflow アップグレードを AWS 管理 |
| サーバーレススケーリング | mw1.micro~mw1.2xlarge の Instance Class で固定スペック |
| Auto Scaling | min-workers / max-workers 設定で自動スケール |
| S3 統合 | DAG・plugins・requirements を S3 bucket から自動同期 |
| AWS サービス統合 | Glue・Redshift・SageMaker・ECS 等を Operator で直接連携 |
| Secrets Manager 統合 | Airflow Connection を Secrets Manager で一元管理 |
| CloudWatch Logs | スケジューラー・ワーカー・Web サーバーログを自動収集 |
| Private / Public Webserver | VPC 内or インターネット経由でのアクセス制御 |
| IAM Authentication | IAM Roles for Service Accounts(IRSA)で細粒度認証 |
アーキテクチャ
┌─────────────────────────────────────────────────────────────┐
│ Amazon MWAA Environment │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Apache Airflow Control Plane │ │
│ │ ├─ Web Server(Public/Private) │ │
│ │ ├─ Scheduler(マルチスケジューラー対応) │ │
│ │ └─ Metadata DB(Aurora PostgreSQL) │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Worker Fleet(Fargate) │ │
│ │ ├─ min_workers ~ max_workers 自動スケール │ │
│ │ ├─ Task Execution │ │
│ │ └─ CloudWatch Logs emit │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ストレージ層: │
│ ├─ S3 Bucket(DAGs / plugins.zip / requirements.txt) │
│ ├─ Aurora Metadata DB │
│ └─ CloudWatch Logs / X-Ray Traces │
└─────────────────────────────────────────────────────────────┘
↕
┌─ AWS VPC (Private Subnets)
│ ├─ NAT Gateway → Internet
│ └─ VPC Endpoint (S3, Secrets Manager)
│
└─ IAM Role(Execution Role for Workers)
├─ S3 Read(DAGs / plugins)
├─ S3 Write(Logs)
├─ Secrets Manager Read(Connections)
└─ AWS Service Permissions(Glue, Redshift, etc.)
コアコンポーネント
1. Environment(環境)
MWAA 環境は Airflow インスタンスの最小単位。1 つのアカウント・リージョンで複数の Environment を作成できます。
# Environment 作成
aws mwaa create-environment \
--name prod-etl-pipeline \
--airflow-version 2.8.1 \
--environment-class mw1.large
2. Instance Class
| Class | vCPU | Memory | 用途 |
|---|---|---|---|
| mw1.micro | 0.5 | 1 GB | 開発・テスト(非本番) |
| mw1.small | 1 | 2 GB | 小規模本番(<100 DAG) |
| mw1.medium | 2 | 4 GB | 標準本番(推奨) |
| mw1.large | 4 | 8 GB | 大規模(100-500 DAG) |
| mw1.xlarge | 8 | 16 GB | 超大規模・複雑 DAG |
| mw1.2xlarge | 16 | 32 GB | エンタープライズ |
3. Scheduler
- Airflow DAG のスケジュール・トリガー・task 依存関係を管理
- Multi-Scheduler 構成(Airflow 2.x 以降)で高可用性
- DAG Parse Interval:デフォルト 30 秒(DAG 変更を検知・更新)
- Catchup:デフォルト False(バックフィル機能の有効化)
4. Workers
- Fargate コンテナで実行される Task Executor
- min_workers / max_workers で Auto Scaling
- CloudWatch Logs に task stdout/stderr を自動出力
5. DAG(Directed Acyclic Graph)
Python で定義されたワークフロー。Task の依存関係・並列実行・条件分岐を記述。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG(
dag_id='simple_pipeline',
start_date=datetime(2025, 1, 1),
schedule_interval='0 2 * * *', # 毎日 2 AM
catchup=False,
) as dag:
def extract():
print("Extracting data...")
def load():
print("Loading data...")
t1 = PythonOperator(task_id='extract_task', python_callable=extract)
t2 = PythonOperator(task_id='load_task', python_callable=load)
t1 >> t2 # 依存関係
Environment・Instance Class・ワーカースケーリング
Environment の作成(詳細)
aws mwaa create-environment \
--name production-etl \
--airflow-version 2.8.1 \
--environment-class mw1.medium \
--max-workers 20 \
--min-workers 2 \
--dag-s3-path s3://my-mwaa-bucket/dags/ \
--plugins-s3-path s3://my-mwaa-bucket/plugins.zip \
--requirements-s3-path s3://my-mwaa-bucket/requirements.txt \
--execution-role-arn arn:aws:iam::123456789012:role/MWAAExecutionRole \
--network-configuration \
SubnetIds=subnet-private-a,subnet-private-b \
SecurityGroupIds=sg-mwaa \
--source-bucket-arn arn:aws:s3:::my-mwaa-bucket \
--webserver-access-mode PRIVATE_ONLY \
--weekly-maintenance-window-start MON:03:00 \
--airflow-configuration-options \
core.default_task_retries=3 \
core.default_view=tree \
celery.worker_autoscale=10,1 \
logging.logging_level=INFO
Auto Scaling 戦略
# DAG 内で resource requirements を指定
from airflow.models import Variable
# min_workers = 2(常時実行)
# max_workers = 20(ピーク時)
#
# スケーリング判断:
# - Active Task 数が閾値を超えたら workers 追加
# - 1 分以上 task がない状態が続いたら workers 削減
Instance Class の選択基準
# 環境クラス選択フロー
if num_dag < 50 and daily_tasks < 100:
instance_class = "mw1.small" # 開発・小規模本番
elif num_dag < 200 and daily_tasks < 500:
instance_class = "mw1.medium" # 標準本番(推奨)
elif num_dag < 500 and daily_tasks < 1500:
instance_class = "mw1.large" # 大規模
else:
instance_class = "mw1.xlarge" # エンタープライズ
DAG の開発・デプロイメント
DAG 開発フロー
# 1. ローカルで DAG を開発・テスト
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta
with DAG(
dag_id='ecommerce_etl',
start_date=datetime(2025, 1, 1),
schedule_interval='0 1 * * *', # 毎日 1 AM UTC
catchup=False,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
},
tags=['etl', 'production', 'ecommerce'],
) as dag:
# タスク 1: S3 入力ファイルを監視
wait_input = S3KeySensor(
task_id='wait_for_input_file',
bucket_name='ecommerce-data',
bucket_key='raw/{{ ds }}/orders.csv',
aws_conn_id='aws_default',
timeout=3600,
poke_interval=60,
)
# タスク 2: Glue ETL を実行
run_glue_etl = GlueJobOperator(
task_id='run_glue_etl_job',
job_name='ecommerce-etl-process',
script_args={
'--input_path': 's3://ecommerce-data/raw/{{ ds }}/',
'--output_path': 's3://ecommerce-data/processed/{{ ds }}/',
},
aws_conn_id='aws_default',
)
# タスク 3: Redshift にロード
load_redshift = RedshiftSQLOperator(
task_id='load_to_redshift',
sql="""
COPY analytics.orders
FROM 's3://ecommerce-data/processed/{{ ds }}/orders/'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
FORMAT AS PARQUET
MANIFEST;
""",
redshift_conn_id='redshift_default',
)
# 依存関係を定義
wait_input >> run_glue_etl >> load_redshift
mwaa test コマンド(DAG Validation)
# ローカルで DAG をテスト(syntax & import 確認)
python -m py_compile dags/ecommerce_etl.py
# または、Airflow CLI でテスト
airflow dags test ecommerce_etl 2025-01-01
# MWAA 環境と同じバージョンで テスト実行
docker run -it -v $(pwd):/mwaa \
amazon/mwaa:2.8.1 \
airflow dags test ecommerce_etl 2025-01-01
S3 統合によるコード管理
S3 バケット構造
my-mwaa-bucket/
├── dags/
│ ├── ecommerce_etl.py
│ ├── data_quality_check.py
│ ├── common/
│ │ ├── __init__.py
│ │ └── utils.py
│ └── config.py
│
├── plugins.zip
│ └── (custom_operators, hooks, sensors)
│
└── requirements.txt
├── apache-airflow-providers-amazon==6.2.0
├── pydantic>=2.0
└── requests==2.31.0
S3 Auto-sync メカニズム
# Environment 作成時に DAG S3 path を指定
--dag-s3-path s3://my-mwaa-bucket/dags/
# MWAA Scheduler が定期的に polling(デフォルト 30 秒)
# → S3 内の DAG を自動検出・reload
# → plugins.zip を自動展開
# → requirements.txt を pip install
# デプロイフロー:
# git commit → CodePipeline → S3 PUT → MWAA 自動反映
DAG Deploy ベストプラクティス
# CI/CD パイプラインから S3 に sync
# 1. DAG を git にcommit
git add dags/new_pipeline.py
git commit -m "feat: add new pipeline"
# 2. CodePipeline で自動テスト
pytest tests/test_dags.py
# 3. S3 に sync(テスト成功時のみ)
aws s3 sync ./dags s3://my-mwaa-bucket/dags/ \
--delete \
--exclude "*.pyc" \
--exclude "__pycache__/*"
# 4. requirements.txt アップロード(pip install 自動トリガー)
aws s3 cp requirements.txt s3://my-mwaa-bucket/requirements.txt
# 5. MWAA が自動反映(30 秒以内)
# → Scheduler logs で確認
aws logs tail /aws/mwaa/prod-etl-pipeline/scheduler --follow
AWS サービス統合
Glue ETL Operator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
glue_task = GlueJobOperator(
task_id='run_glue_etl',
job_name='my-glue-job',
script_args={
'--input_bucket': 's3://raw-data',
'--output_bucket': 's3://processed-data',
'--date': '{{ ds }}',
},
aws_conn_id='aws_default',
wait_for_completion=True,
)
Redshift SQL Operator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
redshift_task = RedshiftSQLOperator(
task_id='redshift_analysis',
sql="""
INSERT INTO analytics.daily_summary
SELECT
date,
COUNT(*) as order_count,
SUM(amount) as total_revenue
FROM staging.orders
WHERE date = '{{ ds }}'
GROUP BY date;
""",
redshift_conn_id='redshift_default',
)
SageMaker Training Operator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTrainingOperator
sagemaker_task = SageMakerTrainingOperator(
task_id='train_ml_model',
config={
'TrainingJobName': 'ml-model-{{ ds }}',
'RoleArn': 'arn:aws:iam::123456789012:role/SageMakerRole',
'AlgorithmSpecification': {
'TrainingImage': '382416733822.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
'TrainingInputMode': 'File',
},
'InputDataConfig': [
{
'ChannelName': 'training',
'DataSource': {
'S3DataSource': {
'S3Uri': 's3://ml-data/train/{{ ds }}/',
'S3DataType': 'S3Prefix',
'S3DataDistributionType': 'FullyReplicated',
}
},
}
],
'OutputDataConfig': {
'S3OutputPath': 's3://ml-models/output/',
},
},
wait_for_completion=True,
)
Lambda Operator
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
lambda_task = LambdaInvokeFunctionOperator(
task_id='invoke_data_validation',
function_name='data_quality_check',
payload='{{"bucket": "processed-data", "key": "{{ ds }}/data.parquet"}}',
aws_conn_id='aws_default',
)
ECS Task Operator
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
ecs_task = EcsRunTaskOperator(
task_id='run_ecs_container',
cluster='ecs-prod-cluster',
task_definition='custom_data_processing',
launch_type='FARGATE',
network_configuration={
'awsvpcConfiguration': {
'subnets': ['subnet-private-a', 'subnet-private-b'],
'securityGroups': ['sg-ecs-tasks'],
'assignPublicIp': 'DISABLED',
}
},
overrides={
'containerOverrides': [
{
'name': 'data_processor',
'environment': [
{'name': 'DATE', 'value': '{{ ds }}'},
],
}
]
},
awslogs_group='/ecs/data-processing',
awslogs_stream_prefix='mwaa-',
)
Airflow Connections・Secrets Manager 統合
Secrets Manager での Connection 管理
# AWS Secrets Manager にシークレットを保存
import json
import boto3
secret_dict = {
"conn_type": "postgres",
"host": "prod-redshift.xxx.redshift.amazonaws.com",
"login": "redshift_user",
"password": "SecurePassword123!",
"port": 5439,
"schema": "analytics",
}
boto3.client('secretsmanager').create_secret(
Name='airflow/connections/redshift_default',
SecretString=json.dumps(secret_dict)
)
Secrets Manager バックエンド有効化
# MWAA Environment を更新(Secrets Manager バックエンドを有効化)
aws mwaa update-environment \
--name prod-etl-pipeline \
--airflow-configuration-options \
'secrets.backend=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' \
'secrets.backend_kwargs={"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}'
# これ以降、Airflow は Connections を Secrets Manager から自動取得
Variable の Secrets Manager 管理
from airflow.models import Variable
# Secrets Manager に保存されたシークレットを取得
database_host = Variable.get('db_host') # airflow/variables/db_host から取得
api_key = Variable.get('api_key') # airflow/variables/api_key から取得
主要ユースケース
1. ETL パイプラインの自動化
S3 Data Lake
↓
Glue Data Catalog(テーブル登録)
↓
MWAA DAG
├→ Glue ETL(Spark 変換)
├→ Redshift COPY(DW ロード)
└→ Athena Query(検証)
↓
CloudWatch Logs(監視・アラート)
2. 機械学習パイプラインのオーケストレーション
Data Preparation (Glue)
↓
Feature Engineering (SageMaker Processing)
↓
Model Training (SageMaker Training)
↓
Model Evaluation (Lambda)
↓
Model Registry (SageMaker Model Registry)
↓
Batch Inference (SageMaker Batch Transform)
3. データクオリティーチェック
Glue ETL → Output
↓
MWAA Data Quality DAG
├→ Great Expectations
├→ Custom Validation Rules
└→ SNS Notification(失敗時)
4. マルチテナント SaaS データパイプライン
Customer Data Ingestion
↓
MWAA (Tenant-specific DAGs)
├→ Tenant A: Extract → Transform → Load
├→ Tenant B: Extract → Transform → Load
└→ Tenant C: Extract → Transform → Load
↓
Isolated DynamoDB Instances
5. ストリーミングデータ + Batch 処理
Kinesis → Lambda (buffering)
↓
S3 (parquetized)
↓
MWAA DAG (hourly batch)
├→ Athena query
└→ Redshift update
6. リアルタイムレコメンデーション更新
User Behavior Events
↓
Kinesis Firehose → S3
↓
MWAA (nightly DAG)
├→ Glue ETL (features preparation)
├→ SageMaker Training
└→ ElastiCache (recommendations cache)
7. ログ集約・監視
Application Logs (CloudWatch Logs)
↓
MWAA Log Processing DAG
├→ Athena (ad-hoc queries)
├→ OpenSearch (indexing)
└→ S3 Data Lake (archive)
8. 依存関係管理された複雑 DAG
External API Call
↓ (success → proceed / fail → retry)
↓
Parallel Processing (Map)
├→ Process A
├→ Process B
└→ Process C
↓
Aggregation
↓
Report Generation (SES)
9. 条件分岐・動的 DAG
Data Check Task
↓
BranchOperator
├→ データ品質 OK → Analysis
└→ データ品質 NG → Alert + Rollback
10. クロスアカウント DAG 実行
Account A (MWAA Environment)
↓ AssumeRole
Account B (Cross-Account Resource)
├→ Redshift Cluster
└→ S3 Bucket
11. スケジュール変動への対応
MWAA Backfill (catchup=True)
↓
月末決算処理(特殊スケジュール)
↓
年度末監査(1 回限りの大規模処理)
12. ガバナンス・監査ログ
MWAA DAG Execution
↓
CloudTrail(IAM 操作)
↓
CloudWatch Logs(Scheduler/Worker logs)
↓
S3(ログアーカイブ)
↓
QuickSight(ダッシュボード)
CLI・SDK・IaC による操作例
AWS CLI での操作
# 1. Environment 作成
aws mwaa create-environment \
--name my-airflow-env \
--airflow-version 2.8.1 \
--environment-class mw1.medium \
--max-workers 10 \
--min-workers 1 \
--dag-s3-path s3://bucket/dags/ \
--execution-role-arn arn:aws:iam::123456789012:role/MWAARole \
--network-configuration SubnetIds=subnet-xxx,subnet-yyy \
--source-bucket-arn arn:aws:s3:::bucket
# 2. DAG 実行をトリガー
aws mwaa create-cli-token --name my-airflow-env
# → token 取得 → curl で Airflow API 呼び出し
# 3. Environment 情報確認
aws mwaa get-environment --name my-airflow-env
# 4. Logs を確認
aws logs tail /aws/mwaa/my-airflow-env/scheduler --follow
# 5. Environment 削除
aws mwaa delete-environment --name my-airflow-env
boto3 SDK での操作
import boto3
import json
mwaa = boto3.client('mwaa', region_name='us-east-1')
# Environment 作成
response = mwaa.create_environment(
Name='prod-etl',
AirflowVersion='2.8.1',
EnvironmentClass='mw1.medium',
MaxWorkers=20,
MinWorkers=2,
DagS3Path='s3://my-bucket/dags/',
ExecutionRoleArn='arn:aws:iam::123456789012:role/MWAARole',
NetworkConfiguration={
'SubnetIds': ['subnet-a', 'subnet-b'],
'SecurityGroupIds': ['sg-mwaa'],
},
AirflowConfigurationOptions={
'core.default_task_retries': '3',
'core.max_active_runs_per_dag': '1',
'logging.logging_level': 'INFO',
}
)
print(json.dumps(response, indent=2, default=str))
Terraform / CDK による IaC
# Terraform
resource "aws_mwaa_environment" "main" {
name = "prod-etl"
airflow_version = "2.8.1"
environment_class = "mw1.medium"
max_workers = 20
min_workers = 2
dag_s3_path = "s3://${aws_s3_bucket.mwaa.id}/dags/"
plugins_s3_path = "s3://${aws_s3_bucket.mwaa.id}/plugins.zip"
requirements_s3_path = "s3://${aws_s3_bucket.mwaa.id}/requirements.txt"
execution_role_arn = aws_iam_role.mwaa_role.arn
vpc_config {
subnet_ids = aws_subnet.private[*].id
security_group_ids = [aws_security_group.mwaa.id]
}
airflow_configuration_options = {
"core.default_task_retries" = "3"
"logging.logging_level" = "INFO"
}
}
# CDK
from aws_cdk import (
aws_mwaa as mwaa,
aws_iam as iam,
aws_s3 as s3,
core,
)
class MWAAStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# S3 Bucket
bucket = s3.Bucket(
self, "MWAABucket",
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)
# IAM Role
role = iam.Role(
self, "MWAARole",
assumed_by=iam.ServicePrincipal("mwaa.amazonaws.com"),
)
role.add_to_policy(iam.PolicyStatement(
actions=["s3:*"],
resources=[bucket.arn_for_objects("*")],
))
# MWAA Environment
env = mwaa.CfnEnvironment(
self, "MWAAEnvironment",
name="prod-etl",
airflow_version="2.8.1",
environment_class="mw1.medium",
max_workers=20,
min_workers=2,
dag_s3_path=f"s3://{bucket.bucket_name}/dags/",
execution_role_arn=role.role_arn,
network_configuration=mwaa.CfnEnvironment.NetworkConfigurationProperty(
subnet_ids=["subnet-a", "subnet-b"],
security_group_ids=["sg-mwaa"],
),
)
CloudWatch Logs・X-Ray・メトリクス
CloudWatch Logs 構成
/aws/mwaa/{EnvironmentName}/
├── scheduler/
│ └── 2025-01-01T10:00:00Z_*.log (DAG parse / task schedule)
├── worker/
│ └── 2025-01-01T10:00:00Z_*.log (Task execution)
└── webserver/
└── 2025-01-01T10:00:00Z_*.log (Web UI access)
ログクエリ例
# Scheduler エラーを確認
aws logs filter-log-events \
--log-group-name /aws/mwaa/prod-etl/scheduler \
--filter-pattern "[ERROR]" \
--start-time $(date -d '1 hour ago' +%s)000
# Worker 内で失敗したタスクを検索
aws logs filter-log-events \
--log-group-name /aws/mwaa/prod-etl/worker \
--filter-pattern "\"[FAILED]\"" \
--start-time $(date -d '1 hour ago' +%s)000
CloudWatch メトリクス
| メトリクス | 説明 |
|---|---|
NumDagProcessFailures |
DAG Parse エラー数 |
NumTasksFailed |
失敗したタスク数 |
NumTasksSuccess |
成功したタスク数 |
DagDuration |
DAG 実行時間 |
TaskDuration |
タスク実行時間 |
NumSchedulerHeartbeats |
Scheduler ハートビート |
X-Ray トレーシング
# MWAA Environment で X-Ray トレーシングを有効化
aws mwaa update-environment \
--name prod-etl \
--airflow-configuration-options \
'logging.remote_logging=True' \
'logging.remote_base_log_folder=s3://bucket/logs/'
Private Webserver・IAM 認証・セキュリティ
Private Webserver モード
# Private Webserver 設定(VPC 内からのみアクセス可能)
aws mwaa create-environment \
--name prod-etl \
--webserver-access-mode PRIVATE_ONLY \
--network-configuration \
SubnetIds=subnet-private-a,subnet-private-b
IAM 認証によるアクセス制御
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:user/data-engineer"
},
"Action": [
"mwaa:GetEnvironment",
"airflow:DescribeDAG",
"airflow:ListDAGs"
],
"Resource": "arn:aws:mwaa:us-east-1:123456789012:environment/prod-etl"
}
]
}
セキュリティベストプラクティス
| 対策 | 説明 |
|---|---|
| Private Webserver | VPC 内からのみアクセス可能に設定 |
| IAM Authentication | IAM Roles for Service Accounts で credentials 不要 |
| Secrets Manager | Connection passwords を暗号化管理 |
| VPC Endpoints | S3・Secrets Manager への通信を VPC 内に閉じ込める |
| Encryption at Rest | KMS で DAG・Log を暗号化 |
| Execution Role | 最小権限の IAM Role を割り当てる |
VPC 構成・Network Security
VPC エンドポイント構成
# S3 Gateway Endpoint(NAT Gateway 不要)
aws ec2 create-vpc-endpoint \
--vpc-id vpc-xxx \
--service-name com.amazonaws.us-east-1.s3 \
--route-table-ids rtb-xxx
# Secrets Manager Interface Endpoint
aws ec2 create-vpc-endpoint \
--vpc-id vpc-xxx \
--vpc-endpoint-type Interface \
--service-name com.amazonaws.us-east-1.secretsmanager \
--subnet-ids subnet-private-a subnet-private-b \
--security-group-ids sg-vpc-endpoint
Security Group 設定
# MWAA Security Group
aws ec2 create-security-group \
--group-name mwaa-sg \
--description "MWAA Workers & Scheduler" \
--vpc-id vpc-xxx
# 許可ルール(内部通信)
aws ec2 authorize-security-group-ingress \
--group-id sg-mwaa \
--protocol tcp \
--port 5432 \
--source-group sg-mwaa # PostgreSQL metadata DB
# RDS への通信
aws ec2 authorize-security-group-ingress \
--group-id sg-rds \
--protocol tcp \
--port 5432 \
--source-group sg-mwaa
類似サービス比較
| 観点 | MWAA | Step Functions | Astronomer | Google Cloud Composer | Prefect | Dagster |
|---|---|---|---|---|---|---|
| インフラ管理 | フルマネージド | フルマネージド | 手動(VM/K8s) | フルマネージド | 手動 / Cloud | 手動 / Cloud |
| 言語 | Python DAG | JSON/YAML ASL | Python DAG | Python DAG | Python | Python |
| スケーリング | Auto(Worker) | 自動(State Machine) | 手動 | Auto | 手動 | 手動 |
| AWS統合 | ネイティブ | ネイティブ | Provider | Limited | Hook | Resource |
| 学習曲線 | 中(Airflow知識) | 低(JSON宣言的) | 中 | 中 | 中 | 高 |
| コスト | 中(Environment料金) | 低(実行従量課金) | 高(SaaS+管理) | 中 | 低(オンプレ) | 低(オンプレ) |
| 複雑DAG対応 | 優秀 | 不向き | 優秀 | 優秀 | 優秀 | 優秀 |
| 動的Task | DynamicTaskMap | Map State | Limited | Limited | 優秀 | 優秀 |
ベストプラクティス
推奨事項(✅)
- ✅ DAG ファイルをバージョン管理(git)で管理
- ✅ requirements.txt で 明示的に依存パッケージを管理
- ✅ S3 ベースのコード deploy で CI/CD 自動化
- ✅ Secrets Manager で credentials 一元管理
- ✅ CloudWatch Logs で スケジューラー・ワーカー・アプリログを監視
- ✅ Task Retries・Timeout を適切に設定
- ✅ Monitoring alerts(SNS・Slack)で障害通知
- ✅ Private Webserver + IAM 認証で セキュリティ確保
- ✅ Instance Class を 負荷パターンに応じて選定
- ✅ max_active_runs_per_dag で リソース枯渇を防止
アンチパターン(❌)
- ❌ DAG をコンソール直接編集(バージョン管理外)
- ❌ Hard-coded credentials を DAG に埋め込む
- ❌ Public Webserver を本番運用(認証なし)
- ❌ Task リトライなし(デフォルト)で本番運用
- ❌ 監視アラートなし(障害検知遅延)
- ❌ mw1.micro で本番運用(リソース不足)
- ❌ catchup=True で全期間バックフィル(リソース枯渇)
- ❌ 複雑な依存関係を task 内ロジックで実装(保守困難)
- ❌ Logging level=DEBUG で本番運用(ログ出力過多)
トラブルシューティング表
| 症状 | 原因 | 解決方法 |
|---|---|---|
| DAG が表示されない | S3 パスが違う / DAG syntax エラー | aws s3 ls s3://bucket/dags/ で確認 / airflow dags test で syntax チェック |
| Task が stuck(実行されない) | Worker スケーリング遅延 / Resource 枯渇 | max_workers 上限確認 / Task duration 短縮 |
| Task timeout | Timeout 設定が短すぎる / 実行時間増加 | execution_timeout 延長 / Task 分割 |
| Redshift COPY 失敗 | IAM Role 権限不足 / S3 path 誤り | aws s3 ls s3://bucket/path/ で S3 確認 / IAM Role policy 確認 |
| Scheduler restart ループ | DAG Parse エラー / Memory 枯渇 | CloudWatch Logs で scheduler error 確認 / Instance Class 上げる |
| Connection 取得失敗 | Secrets Manager path 誤り / 暗号化キー権限なし | aws secretsmanager get-secret-value でシークレット確認 |
| Worker ログが出力されない | Logging level 設定 / CloudWatch Logs 権限なし | logging.logging_level=INFO で明示的に設定 |
2025-2026 最新動向
- Apache Airflow 3.0 対応予定:Deferrable task・Dynamic Task Mapping の標準化
- Kubernetes Executor 強化:EKS との統合強化で大規模 DAG サポート
- Bedrock 統合:生成 AI をワークフローに組み込む能力
- DataOps 統合:dbt・Great Expectations との Native 統合
- Multi-Region MWAA:クロスリージョンフェイルオーバー対応予定
- Cost Optimization:Spot Instances for Worker Fleet でコスト 40% 削減
学習リソース
公式リソース
OSS / コミュニティ
AWS ベンダーリソース
実装例・チェックリスト
MWAA 導入チェックリスト
- [ ] Airflow DAG 開発経験がある
- [ ] S3 バケットを作成・IAM Policy を設定
- [ ] Execution Role に必要な IAM permissions を追加
- [ ] VPC・Subnet・Security Group を構成
- [ ] requirements.txt で依存パッケージを指定
- [ ] DAG を
airflow dags testでローカル検証 - [ ] S3 に DAG / plugins.zip / requirements.txt をアップロード
- [ ] MWAA Environment を CLI / CDK で作成
- [ ] Airflow Web UI にアクセス(Public / Private Webserver 確認)
- [ ] CloudWatch Logs で Scheduler ログを確認
- [ ] DAG をトリガーして Task 実行確認
- [ ] Slack / SNS アラートを設定
- [ ] 本番 Environment に昇格
まとめ
Amazon MWAA は 「Apache Airflow のフルマネージドサービス」 で、複雑なデータパイプライン・ETL ワークフロー・機械学習オーケストレーションを Python DAG で自動化できます。S3 ベースのコード管理・AWS サービス統合・CloudWatch Logs 監視を提供し、Glue・Redshift・SageMaker・Lambda との連携に優れています。Instance Class を負荷に応じて選定し、Auto Scaling・Secrets Manager 統合・IAM 認証でセキュリティを確保することで、エンタープライズグレードのデータエンジニアリングプラットフォームを実現できます。
最終更新:2026-04-27 バージョン:v2.0