目次

Amazon MWAA(Managed Workflows for Apache Airflow)v2.0 完全ガイド

初心者から実務者向けの包括的解説

Amazon Managed Workflows for Apache Airflow(MWAA) は、Apache Airflow をフルマネージドのサーバーレスサービスとして提供し、データパイプライン・ETL ジョブ・機械学習ワークフローを Python ベースの DAG で自動化・オーケストレーションできます。インフラ管理・スケーリング・パッチ適用を AWS が完全に担当し、データエンジニアリングチームは DAG 開発に専念できます。本ガイドは、MWAA の概念・アーキテクチャ・実装・運用・最新動向を体系的に解説します。

ドキュメントの目的

本ガイドは以下を対象としています。

  • 初心者向け:Airflow とは何か、MWAA が解決する課題を学びたい方
  • データエンジニア向け:DAG 開発・S3 デプロイ・AWS サービス統合を実装したい方
  • DevOps/SRE 向け:Environment 構成・Auto Scaling・監視・ロギングを運用したい方
  • アーキテクト向け:Step Functions・Glue・EventBridge との使い分け・ベストプラクティス
  • 意思決定者向け:Astronomer・セルフホスト Airflow との投資判断

2025-2026 年の MWAA エコシステム

  • Apache Airflow 2.8 / 2.9 サポート:最新エンジンで Deferrable Task・Dynamic Task Mapping 対応
  • Private Webserver モード:VPC 内からのみアクセス可能な Private Webserver
  • IAM Authentication:IAM Roles for Service Accounts(IRSA)による細粒度認証
  • CloudWatch Logs 統合強化:スケジューラー・ワーカー・ウェブサーバーのログを一元管理
  • 環境クラス拡張:mw1.xlarge・mw1.2xlarge で大規模 DAG 対応
  • プラグイン・依存関係の自動検出:requirements.txt 変更時の自動再デプロイ
  • Cross-Account DAG デプロイ:AWS CodePipeline との統合で CI/CD 自動化

目次

  1. 概要
  2. MWAA が解決する課題
  3. 主な特徴
  4. アーキテクチャ
  5. コアコンポーネント
  6. Environment・Instance Class・ワーカースケーリング
  7. DAG の開発・デプロイメント
  8. S3 統合によるコード管理
  9. AWS サービス統合(Glue・Redshift・SageMaker・Lambda)
  10. Airflow Connections・Secrets Manager 統合
  11. 主要ユースケース(12+)
  12. CLI・SDK・IaC による操作例
  13. CloudWatch Logs・X-Ray・メトリクス
  14. Private Webserver・IAM 認証・セキュリティ
  15. VPC 構成・Network Security
  16. 類似サービス比較(Step Functions・Astronomer・Google Cloud Composer・Prefect・Dagster)
  17. ベストプラクティス
  18. トラブルシューティング表
  19. 2025-2026 最新動向
  20. 学習リソース
  21. 実装例・チェックリスト
  22. まとめ
  23. 参考文献

概要

初心者向けメモ:MWAA は「Apache Airflow をマネージドサービス化したもの」です。Airflow は複数のステップを持つワークフロー(DAG = Directed Acyclic Graph)を定義・スケジュール・監視するオープンソースツール。MWAA を使えば、Airflow の EC2 クラスター・データベース・スケーリング管理が不要になり、Python コードで DAG を書いて S3 に置くだけで実行できます。Glue ETL・Redshift・SageMaker などの AWS サービスと統合し、データパイプラインの自動化に特化しています。

MWAA の位置づけ:

graph LR
    DataSource["データソース<br/>S3/RDS/API"]
    MWAA["Amazon MWAA<br/>DAG Orchestration"]
    Tasks["並列タスク処理<br/>Glue/Redshift/ECS"]
    Output["出力・分析<br/>S3/Redshift/Athena"]
    
    DataSource -->|監視・トリガー| MWAA
    MWAA -->|実行| Tasks
    Tasks -->|結果格納| Output
    
    Monitor["CloudWatch Logs<br/>Metrics/Alarms"]
    MWAA -.->|ログ・メトリクス| Monitor

MWAA が解決する課題

課題 1: Airflow インフラ管理の複雑性

状況:セルフホスト Airflow でメタデータ DB(PostgreSQL)・スケジューラー・ワーカー・Web サーバーを EC2/Kubernetes で運用。スケーリング・パッチ適用・HA 設定が手動。

MWAA による解決

  • AWS が Environment・Instance Class・Worker Scaling を自動管理
  • Aurora PostgreSQL メタデータ DB が MWAA 専用(マネージド)
  • マルチ AZ HA が標準装備
  • 運用工数を 80% 削減

課題 2: Airflow DAG の依存関係・リトライ・監視

状況:複数の Glue ETL → Redshift COPY → SageMaker 学習 → データ検証というパイプラインで、任意のステップが失敗時に全体が停止。リトライ・スキップ・アラートを手動管理。

MWAA による解決

  • DAG で依存関係を宣言的に定義
  • 自動リトライ・タイムアウト・リトライ間隔を設定可能
  • CloudWatch Logs でスケジューラー・ワーカー・タスクログを一元管理
  • Slack・SNS・EventBridge 連携でアラート自動化

課題 3: 開発環境・本番環境の一貫性

状況:ローカル Airflow で DAG テスト後、本番に deploy すると scheduler timeout や plugin エラーが発生。バージョン・dependency が環境ごとに異なる。

MWAA による解決

  • Apache Airflow バージョンを 2.0 / 2.4 / 2.8 等で明示的に指定
  • requirements.txt を S3 に置いて pip install 自動管理
  • plugins.zip をアップロードして全ワーカーで共有
  • 本番前に mwaa test コマンドで DAG syntax check

主な特徴

特徴 説明
フルマネージド インフラ・OS パッチ・Airflow アップグレードを AWS 管理
サーバーレススケーリング mw1.micro~mw1.2xlarge の Instance Class で固定スペック
Auto Scaling min-workers / max-workers 設定で自動スケール
S3 統合 DAG・plugins・requirements を S3 bucket から自動同期
AWS サービス統合 Glue・Redshift・SageMaker・ECS 等を Operator で直接連携
Secrets Manager 統合 Airflow Connection を Secrets Manager で一元管理
CloudWatch Logs スケジューラー・ワーカー・Web サーバーログを自動収集
Private / Public Webserver VPC 内or インターネット経由でのアクセス制御
IAM Authentication IAM Roles for Service Accounts(IRSA)で細粒度認証

アーキテクチャ

┌─────────────────────────────────────────────────────────────┐
│ Amazon MWAA Environment                                     │
│                                                             │
│  ┌────────────────────────────────────────────────────┐   │
│  │ Apache Airflow Control Plane                       │   │
│  │  ├─ Web Server(Public/Private)                   │   │
│  │  ├─ Scheduler(マルチスケジューラー対応)           │   │
│  │  └─ Metadata DB(Aurora PostgreSQL)               │   │
│  └────────────────────────────────────────────────────┘   │
│                                                             │
│  ┌────────────────────────────────────────────────────┐   │
│  │ Worker Fleet(Fargate)                            │   │
│  │  ├─ min_workers ~ max_workers 自動スケール        │   │
│  │  ├─ Task Execution                                 │   │
│  │  └─ CloudWatch Logs emit                           │   │
│  └────────────────────────────────────────────────────┘   │
│                                                             │
│  ストレージ層:                                             │
│  ├─ S3 Bucket(DAGs / plugins.zip / requirements.txt)    │
│  ├─ Aurora Metadata DB                                    │
│  └─ CloudWatch Logs / X-Ray Traces                        │
└─────────────────────────────────────────────────────────────┘
       ↕
   ┌─ AWS VPC (Private Subnets)
   │  ├─ NAT Gateway → Internet
   │  └─ VPC Endpoint (S3, Secrets Manager)
   │
   └─ IAM Role(Execution Role for Workers)
      ├─ S3 Read(DAGs / plugins)
      ├─ S3 Write(Logs)
      ├─ Secrets Manager Read(Connections)
      └─ AWS Service Permissions(Glue, Redshift, etc.)

コアコンポーネント

1. Environment(環境)

MWAA 環境は Airflow インスタンスの最小単位。1 つのアカウント・リージョンで複数の Environment を作成できます。

# Environment 作成
aws mwaa create-environment \
  --name prod-etl-pipeline \
  --airflow-version 2.8.1 \
  --environment-class mw1.large

2. Instance Class

Class vCPU Memory 用途
mw1.micro 0.5 1 GB 開発・テスト(非本番)
mw1.small 1 2 GB 小規模本番(<100 DAG)
mw1.medium 2 4 GB 標準本番(推奨)
mw1.large 4 8 GB 大規模(100-500 DAG)
mw1.xlarge 8 16 GB 超大規模・複雑 DAG
mw1.2xlarge 16 32 GB エンタープライズ

3. Scheduler

  • Airflow DAG のスケジュール・トリガー・task 依存関係を管理
  • Multi-Scheduler 構成(Airflow 2.x 以降)で高可用性
  • DAG Parse Interval:デフォルト 30 秒(DAG 変更を検知・更新)
  • Catchup:デフォルト False(バックフィル機能の有効化)

4. Workers

  • Fargate コンテナで実行される Task Executor
  • min_workers / max_workers で Auto Scaling
  • CloudWatch Logs に task stdout/stderr を自動出力

5. DAG(Directed Acyclic Graph)

Python で定義されたワークフロー。Task の依存関係・並列実行・条件分岐を記述。

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    dag_id='simple_pipeline',
    start_date=datetime(2025, 1, 1),
    schedule_interval='0 2 * * *',  # 毎日 2 AM
    catchup=False,
) as dag:
    
    def extract():
        print("Extracting data...")
    
    def load():
        print("Loading data...")
    
    t1 = PythonOperator(task_id='extract_task', python_callable=extract)
    t2 = PythonOperator(task_id='load_task', python_callable=load)
    
    t1 >> t2  # 依存関係

Environment・Instance Class・ワーカースケーリング

Environment の作成(詳細)

aws mwaa create-environment \
  --name production-etl \
  --airflow-version 2.8.1 \
  --environment-class mw1.medium \
  --max-workers 20 \
  --min-workers 2 \
  --dag-s3-path s3://my-mwaa-bucket/dags/ \
  --plugins-s3-path s3://my-mwaa-bucket/plugins.zip \
  --requirements-s3-path s3://my-mwaa-bucket/requirements.txt \
  --execution-role-arn arn:aws:iam::123456789012:role/MWAAExecutionRole \
  --network-configuration \
    SubnetIds=subnet-private-a,subnet-private-b \
    SecurityGroupIds=sg-mwaa \
  --source-bucket-arn arn:aws:s3:::my-mwaa-bucket \
  --webserver-access-mode PRIVATE_ONLY \
  --weekly-maintenance-window-start MON:03:00 \
  --airflow-configuration-options \
    core.default_task_retries=3 \
    core.default_view=tree \
    celery.worker_autoscale=10,1 \
    logging.logging_level=INFO

Auto Scaling 戦略

# DAG 内で resource requirements を指定
from airflow.models import Variable

# min_workers = 2(常時実行)
# max_workers = 20(ピーク時)
# 
# スケーリング判断:
# - Active Task 数が閾値を超えたら workers 追加
# - 1 分以上 task がない状態が続いたら workers 削減

Instance Class の選択基準

# 環境クラス選択フロー

if num_dag < 50 and daily_tasks < 100:
    instance_class = "mw1.small"  # 開発・小規模本番
elif num_dag < 200 and daily_tasks < 500:
    instance_class = "mw1.medium"  # 標準本番(推奨)
elif num_dag < 500 and daily_tasks < 1500:
    instance_class = "mw1.large"   # 大規模
else:
    instance_class = "mw1.xlarge"  # エンタープライズ

DAG の開発・デプロイメント

DAG 開発フロー

# 1. ローカルで DAG を開発・テスト
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta

with DAG(
    dag_id='ecommerce_etl',
    start_date=datetime(2025, 1, 1),
    schedule_interval='0 1 * * *',  # 毎日 1 AM UTC
    catchup=False,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'execution_timeout': timedelta(hours=2),
    },
    tags=['etl', 'production', 'ecommerce'],
) as dag:
    
    # タスク 1: S3 入力ファイルを監視
    wait_input = S3KeySensor(
        task_id='wait_for_input_file',
        bucket_name='ecommerce-data',
        bucket_key='raw/{{ ds }}/orders.csv',
        aws_conn_id='aws_default',
        timeout=3600,
        poke_interval=60,
    )
    
    # タスク 2: Glue ETL を実行
    run_glue_etl = GlueJobOperator(
        task_id='run_glue_etl_job',
        job_name='ecommerce-etl-process',
        script_args={
            '--input_path': 's3://ecommerce-data/raw/{{ ds }}/',
            '--output_path': 's3://ecommerce-data/processed/{{ ds }}/',
        },
        aws_conn_id='aws_default',
    )
    
    # タスク 3: Redshift にロード
    load_redshift = RedshiftSQLOperator(
        task_id='load_to_redshift',
        sql="""
            COPY analytics.orders
            FROM 's3://ecommerce-data/processed/{{ ds }}/orders/'
            IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
            FORMAT AS PARQUET
            MANIFEST;
        """,
        redshift_conn_id='redshift_default',
    )
    
    # 依存関係を定義
    wait_input >> run_glue_etl >> load_redshift

mwaa test コマンド(DAG Validation)

# ローカルで DAG をテスト(syntax & import 確認)
python -m py_compile dags/ecommerce_etl.py

# または、Airflow CLI でテスト
airflow dags test ecommerce_etl 2025-01-01

# MWAA 環境と同じバージョンで テスト実行
docker run -it -v $(pwd):/mwaa \
  amazon/mwaa:2.8.1 \
  airflow dags test ecommerce_etl 2025-01-01

S3 統合によるコード管理

S3 バケット構造

my-mwaa-bucket/
├── dags/
│   ├── ecommerce_etl.py
│   ├── data_quality_check.py
│   ├── common/
│   │   ├── __init__.py
│   │   └── utils.py
│   └── config.py
│
├── plugins.zip
│   └── (custom_operators, hooks, sensors)
│
└── requirements.txt
    ├── apache-airflow-providers-amazon==6.2.0
    ├── pydantic>=2.0
    └── requests==2.31.0

S3 Auto-sync メカニズム

# Environment 作成時に DAG S3 path を指定
--dag-s3-path s3://my-mwaa-bucket/dags/

# MWAA Scheduler が定期的に polling(デフォルト 30 秒)
# → S3 内の DAG を自動検出・reload
# → plugins.zip を自動展開
# → requirements.txt を pip install

# デプロイフロー:
# git commit → CodePipeline → S3 PUT → MWAA 自動反映

DAG Deploy ベストプラクティス

# CI/CD パイプラインから S3 に sync

# 1. DAG を git にcommit
git add dags/new_pipeline.py
git commit -m "feat: add new pipeline"

# 2. CodePipeline で自動テスト
pytest tests/test_dags.py

# 3. S3 に sync(テスト成功時のみ)
aws s3 sync ./dags s3://my-mwaa-bucket/dags/ \
  --delete \
  --exclude "*.pyc" \
  --exclude "__pycache__/*"

# 4. requirements.txt アップロード(pip install 自動トリガー)
aws s3 cp requirements.txt s3://my-mwaa-bucket/requirements.txt

# 5. MWAA が自動反映(30 秒以内)
# → Scheduler logs で確認
aws logs tail /aws/mwaa/prod-etl-pipeline/scheduler --follow

AWS サービス統合

Glue ETL Operator

from airflow.providers.amazon.aws.operators.glue import GlueJobOperator

glue_task = GlueJobOperator(
    task_id='run_glue_etl',
    job_name='my-glue-job',
    script_args={
        '--input_bucket': 's3://raw-data',
        '--output_bucket': 's3://processed-data',
        '--date': '{{ ds }}',
    },
    aws_conn_id='aws_default',
    wait_for_completion=True,
)

Redshift SQL Operator

from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

redshift_task = RedshiftSQLOperator(
    task_id='redshift_analysis',
    sql="""
        INSERT INTO analytics.daily_summary
        SELECT 
            date,
            COUNT(*) as order_count,
            SUM(amount) as total_revenue
        FROM staging.orders
        WHERE date = '{{ ds }}'
        GROUP BY date;
    """,
    redshift_conn_id='redshift_default',
)

SageMaker Training Operator

from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTrainingOperator

sagemaker_task = SageMakerTrainingOperator(
    task_id='train_ml_model',
    config={
        'TrainingJobName': 'ml-model-{{ ds }}',
        'RoleArn': 'arn:aws:iam::123456789012:role/SageMakerRole',
        'AlgorithmSpecification': {
            'TrainingImage': '382416733822.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
            'TrainingInputMode': 'File',
        },
        'InputDataConfig': [
            {
                'ChannelName': 'training',
                'DataSource': {
                    'S3DataSource': {
                        'S3Uri': 's3://ml-data/train/{{ ds }}/',
                        'S3DataType': 'S3Prefix',
                        'S3DataDistributionType': 'FullyReplicated',
                    }
                },
            }
        ],
        'OutputDataConfig': {
            'S3OutputPath': 's3://ml-models/output/',
        },
    },
    wait_for_completion=True,
)

Lambda Operator

from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator

lambda_task = LambdaInvokeFunctionOperator(
    task_id='invoke_data_validation',
    function_name='data_quality_check',
    payload='{{"bucket": "processed-data", "key": "{{ ds }}/data.parquet"}}',
    aws_conn_id='aws_default',
)

ECS Task Operator

from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

ecs_task = EcsRunTaskOperator(
    task_id='run_ecs_container',
    cluster='ecs-prod-cluster',
    task_definition='custom_data_processing',
    launch_type='FARGATE',
    network_configuration={
        'awsvpcConfiguration': {
            'subnets': ['subnet-private-a', 'subnet-private-b'],
            'securityGroups': ['sg-ecs-tasks'],
            'assignPublicIp': 'DISABLED',
        }
    },
    overrides={
        'containerOverrides': [
            {
                'name': 'data_processor',
                'environment': [
                    {'name': 'DATE', 'value': '{{ ds }}'},
                ],
            }
        ]
    },
    awslogs_group='/ecs/data-processing',
    awslogs_stream_prefix='mwaa-',
)

Airflow Connections・Secrets Manager 統合

Secrets Manager での Connection 管理

# AWS Secrets Manager にシークレットを保存
import json
import boto3

secret_dict = {
    "conn_type": "postgres",
    "host": "prod-redshift.xxx.redshift.amazonaws.com",
    "login": "redshift_user",
    "password": "SecurePassword123!",
    "port": 5439,
    "schema": "analytics",
}

boto3.client('secretsmanager').create_secret(
    Name='airflow/connections/redshift_default',
    SecretString=json.dumps(secret_dict)
)

Secrets Manager バックエンド有効化

# MWAA Environment を更新(Secrets Manager バックエンドを有効化)
aws mwaa update-environment \
  --name prod-etl-pipeline \
  --airflow-configuration-options \
    'secrets.backend=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' \
    'secrets.backend_kwargs={"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}'

# これ以降、Airflow は Connections を Secrets Manager から自動取得

Variable の Secrets Manager 管理

from airflow.models import Variable

# Secrets Manager に保存されたシークレットを取得
database_host = Variable.get('db_host')  # airflow/variables/db_host から取得
api_key = Variable.get('api_key')        # airflow/variables/api_key から取得

主要ユースケース

1. ETL パイプラインの自動化

S3 Data Lake
    ↓
Glue Data Catalog(テーブル登録)
    ↓
MWAA DAG
    ├→ Glue ETL(Spark 変換)
    ├→ Redshift COPY(DW ロード)
    └→ Athena Query(検証)
    ↓
CloudWatch Logs(監視・アラート)

2. 機械学習パイプラインのオーケストレーション

Data Preparation (Glue)
    ↓
Feature Engineering (SageMaker Processing)
    ↓
Model Training (SageMaker Training)
    ↓
Model Evaluation (Lambda)
    ↓
Model Registry (SageMaker Model Registry)
    ↓
Batch Inference (SageMaker Batch Transform)

3. データクオリティーチェック

Glue ETL → Output
    ↓
MWAA Data Quality DAG
    ├→ Great Expectations
    ├→ Custom Validation Rules
    └→ SNS Notification(失敗時)

4. マルチテナント SaaS データパイプライン

Customer Data Ingestion
    ↓
MWAA (Tenant-specific DAGs)
    ├→ Tenant A: Extract → Transform → Load
    ├→ Tenant B: Extract → Transform → Load
    └→ Tenant C: Extract → Transform → Load
    ↓
Isolated DynamoDB Instances

5. ストリーミングデータ + Batch 処理

Kinesis → Lambda (buffering)
    ↓
S3 (parquetized)
    ↓
MWAA DAG (hourly batch)
    ├→ Athena query
    └→ Redshift update

6. リアルタイムレコメンデーション更新

User Behavior Events
    ↓
Kinesis Firehose → S3
    ↓
MWAA (nightly DAG)
    ├→ Glue ETL (features preparation)
    ├→ SageMaker Training
    └→ ElastiCache (recommendations cache)

7. ログ集約・監視

Application Logs (CloudWatch Logs)
    ↓
MWAA Log Processing DAG
    ├→ Athena (ad-hoc queries)
    ├→ OpenSearch (indexing)
    └→ S3 Data Lake (archive)

8. 依存関係管理された複雑 DAG

External API Call
    ↓ (success → proceed / fail → retry)
    ↓
Parallel Processing (Map)
    ├→ Process A
    ├→ Process B
    └→ Process C
    ↓
Aggregation
    ↓
Report Generation (SES)

9. 条件分岐・動的 DAG

Data Check Task
    ↓
BranchOperator
    ├→ データ品質 OK → Analysis
    └→ データ品質 NG → Alert + Rollback

10. クロスアカウント DAG 実行

Account A (MWAA Environment)
    ↓ AssumeRole
Account B (Cross-Account Resource)
    ├→ Redshift Cluster
    └→ S3 Bucket

11. スケジュール変動への対応

MWAA Backfill (catchup=True)
    ↓
月末決算処理(特殊スケジュール)
    ↓
年度末監査(1 回限りの大規模処理)

12. ガバナンス・監査ログ

MWAA DAG Execution
    ↓
CloudTrail(IAM 操作)
    ↓
CloudWatch Logs(Scheduler/Worker logs)
    ↓
S3(ログアーカイブ)
    ↓
QuickSight(ダッシュボード)

CLI・SDK・IaC による操作例

AWS CLI での操作

# 1. Environment 作成
aws mwaa create-environment \
  --name my-airflow-env \
  --airflow-version 2.8.1 \
  --environment-class mw1.medium \
  --max-workers 10 \
  --min-workers 1 \
  --dag-s3-path s3://bucket/dags/ \
  --execution-role-arn arn:aws:iam::123456789012:role/MWAARole \
  --network-configuration SubnetIds=subnet-xxx,subnet-yyy \
  --source-bucket-arn arn:aws:s3:::bucket

# 2. DAG 実行をトリガー
aws mwaa create-cli-token --name my-airflow-env
# → token 取得 → curl で Airflow API 呼び出し

# 3. Environment 情報確認
aws mwaa get-environment --name my-airflow-env

# 4. Logs を確認
aws logs tail /aws/mwaa/my-airflow-env/scheduler --follow

# 5. Environment 削除
aws mwaa delete-environment --name my-airflow-env

boto3 SDK での操作

import boto3
import json

mwaa = boto3.client('mwaa', region_name='us-east-1')

# Environment 作成
response = mwaa.create_environment(
    Name='prod-etl',
    AirflowVersion='2.8.1',
    EnvironmentClass='mw1.medium',
    MaxWorkers=20,
    MinWorkers=2,
    DagS3Path='s3://my-bucket/dags/',
    ExecutionRoleArn='arn:aws:iam::123456789012:role/MWAARole',
    NetworkConfiguration={
        'SubnetIds': ['subnet-a', 'subnet-b'],
        'SecurityGroupIds': ['sg-mwaa'],
    },
    AirflowConfigurationOptions={
        'core.default_task_retries': '3',
        'core.max_active_runs_per_dag': '1',
        'logging.logging_level': 'INFO',
    }
)

print(json.dumps(response, indent=2, default=str))

Terraform / CDK による IaC

# Terraform
resource "aws_mwaa_environment" "main" {
  name              = "prod-etl"
  airflow_version   = "2.8.1"
  environment_class = "mw1.medium"
  max_workers       = 20
  min_workers       = 2
  
  dag_s3_path              = "s3://${aws_s3_bucket.mwaa.id}/dags/"
  plugins_s3_path          = "s3://${aws_s3_bucket.mwaa.id}/plugins.zip"
  requirements_s3_path     = "s3://${aws_s3_bucket.mwaa.id}/requirements.txt"
  execution_role_arn       = aws_iam_role.mwaa_role.arn
  
  vpc_config {
    subnet_ids             = aws_subnet.private[*].id
    security_group_ids     = [aws_security_group.mwaa.id]
  }
  
  airflow_configuration_options = {
    "core.default_task_retries" = "3"
    "logging.logging_level"      = "INFO"
  }
}
# CDK
from aws_cdk import (
    aws_mwaa as mwaa,
    aws_iam as iam,
    aws_s3 as s3,
    core,
)

class MWAAStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs):
        super().__init__(scope, id, **kwargs)
        
        # S3 Bucket
        bucket = s3.Bucket(
            self, "MWAABucket",
            block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
        )
        
        # IAM Role
        role = iam.Role(
            self, "MWAARole",
            assumed_by=iam.ServicePrincipal("mwaa.amazonaws.com"),
        )
        role.add_to_policy(iam.PolicyStatement(
            actions=["s3:*"],
            resources=[bucket.arn_for_objects("*")],
        ))
        
        # MWAA Environment
        env = mwaa.CfnEnvironment(
            self, "MWAAEnvironment",
            name="prod-etl",
            airflow_version="2.8.1",
            environment_class="mw1.medium",
            max_workers=20,
            min_workers=2,
            dag_s3_path=f"s3://{bucket.bucket_name}/dags/",
            execution_role_arn=role.role_arn,
            network_configuration=mwaa.CfnEnvironment.NetworkConfigurationProperty(
                subnet_ids=["subnet-a", "subnet-b"],
                security_group_ids=["sg-mwaa"],
            ),
        )

CloudWatch Logs・X-Ray・メトリクス

CloudWatch Logs 構成

/aws/mwaa/{EnvironmentName}/
├── scheduler/
│   └── 2025-01-01T10:00:00Z_*.log  (DAG parse / task schedule)
├── worker/
│   └── 2025-01-01T10:00:00Z_*.log  (Task execution)
└── webserver/
    └── 2025-01-01T10:00:00Z_*.log  (Web UI access)

ログクエリ例

# Scheduler エラーを確認
aws logs filter-log-events \
  --log-group-name /aws/mwaa/prod-etl/scheduler \
  --filter-pattern "[ERROR]" \
  --start-time $(date -d '1 hour ago' +%s)000

# Worker 内で失敗したタスクを検索
aws logs filter-log-events \
  --log-group-name /aws/mwaa/prod-etl/worker \
  --filter-pattern "\"[FAILED]\"" \
  --start-time $(date -d '1 hour ago' +%s)000

CloudWatch メトリクス

メトリクス 説明
NumDagProcessFailures DAG Parse エラー数
NumTasksFailed 失敗したタスク数
NumTasksSuccess 成功したタスク数
DagDuration DAG 実行時間
TaskDuration タスク実行時間
NumSchedulerHeartbeats Scheduler ハートビート

X-Ray トレーシング

# MWAA Environment で X-Ray トレーシングを有効化
aws mwaa update-environment \
  --name prod-etl \
  --airflow-configuration-options \
    'logging.remote_logging=True' \
    'logging.remote_base_log_folder=s3://bucket/logs/'

Private Webserver・IAM 認証・セキュリティ

Private Webserver モード

# Private Webserver 設定(VPC 内からのみアクセス可能)
aws mwaa create-environment \
  --name prod-etl \
  --webserver-access-mode PRIVATE_ONLY \
  --network-configuration \
    SubnetIds=subnet-private-a,subnet-private-b

IAM 認証によるアクセス制御

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:user/data-engineer"
      },
      "Action": [
        "mwaa:GetEnvironment",
        "airflow:DescribeDAG",
        "airflow:ListDAGs"
      ],
      "Resource": "arn:aws:mwaa:us-east-1:123456789012:environment/prod-etl"
    }
  ]
}

セキュリティベストプラクティス

対策 説明
Private Webserver VPC 内からのみアクセス可能に設定
IAM Authentication IAM Roles for Service Accounts で credentials 不要
Secrets Manager Connection passwords を暗号化管理
VPC Endpoints S3・Secrets Manager への通信を VPC 内に閉じ込める
Encryption at Rest KMS で DAG・Log を暗号化
Execution Role 最小権限の IAM Role を割り当てる

VPC 構成・Network Security

VPC エンドポイント構成

# S3 Gateway Endpoint(NAT Gateway 不要)
aws ec2 create-vpc-endpoint \
  --vpc-id vpc-xxx \
  --service-name com.amazonaws.us-east-1.s3 \
  --route-table-ids rtb-xxx

# Secrets Manager Interface Endpoint
aws ec2 create-vpc-endpoint \
  --vpc-id vpc-xxx \
  --vpc-endpoint-type Interface \
  --service-name com.amazonaws.us-east-1.secretsmanager \
  --subnet-ids subnet-private-a subnet-private-b \
  --security-group-ids sg-vpc-endpoint

Security Group 設定

# MWAA Security Group
aws ec2 create-security-group \
  --group-name mwaa-sg \
  --description "MWAA Workers & Scheduler" \
  --vpc-id vpc-xxx

# 許可ルール(内部通信)
aws ec2 authorize-security-group-ingress \
  --group-id sg-mwaa \
  --protocol tcp \
  --port 5432 \
  --source-group sg-mwaa  # PostgreSQL metadata DB

# RDS への通信
aws ec2 authorize-security-group-ingress \
  --group-id sg-rds \
  --protocol tcp \
  --port 5432 \
  --source-group sg-mwaa

類似サービス比較

観点 MWAA Step Functions Astronomer Google Cloud Composer Prefect Dagster
インフラ管理 フルマネージド フルマネージド 手動(VM/K8s) フルマネージド 手動 / Cloud 手動 / Cloud
言語 Python DAG JSON/YAML ASL Python DAG Python DAG Python Python
スケーリング Auto(Worker) 自動(State Machine) 手動 Auto 手動 手動
AWS統合 ネイティブ ネイティブ Provider Limited Hook Resource
学習曲線 中(Airflow知識) 低(JSON宣言的)
コスト 中(Environment料金) 低(実行従量課金) 高(SaaS+管理) 低(オンプレ) 低(オンプレ)
複雑DAG対応 優秀 不向き 優秀 優秀 優秀 優秀
動的Task DynamicTaskMap Map State Limited Limited 優秀 優秀

ベストプラクティス

推奨事項(✅)

  • ✅ DAG ファイルをバージョン管理(git)で管理
  • ✅ requirements.txt で 明示的に依存パッケージを管理
  • ✅ S3 ベースのコード deploy で CI/CD 自動化
  • ✅ Secrets Manager で credentials 一元管理
  • ✅ CloudWatch Logs で スケジューラー・ワーカー・アプリログを監視
  • ✅ Task Retries・Timeout を適切に設定
  • ✅ Monitoring alerts(SNS・Slack)で障害通知
  • ✅ Private Webserver + IAM 認証で セキュリティ確保
  • ✅ Instance Class を 負荷パターンに応じて選定
  • ✅ max_active_runs_per_dag で リソース枯渇を防止

アンチパターン(❌)

  • ❌ DAG をコンソール直接編集(バージョン管理外)
  • ❌ Hard-coded credentials を DAG に埋め込む
  • ❌ Public Webserver を本番運用(認証なし)
  • ❌ Task リトライなし(デフォルト)で本番運用
  • ❌ 監視アラートなし(障害検知遅延)
  • ❌ mw1.micro で本番運用(リソース不足)
  • ❌ catchup=True で全期間バックフィル(リソース枯渇)
  • ❌ 複雑な依存関係を task 内ロジックで実装(保守困難)
  • ❌ Logging level=DEBUG で本番運用(ログ出力過多)

トラブルシューティング表

症状 原因 解決方法
DAG が表示されない S3 パスが違う / DAG syntax エラー aws s3 ls s3://bucket/dags/ で確認 / airflow dags test で syntax チェック
Task が stuck(実行されない) Worker スケーリング遅延 / Resource 枯渇 max_workers 上限確認 / Task duration 短縮
Task timeout Timeout 設定が短すぎる / 実行時間増加 execution_timeout 延長 / Task 分割
Redshift COPY 失敗 IAM Role 権限不足 / S3 path 誤り aws s3 ls s3://bucket/path/ で S3 確認 / IAM Role policy 確認
Scheduler restart ループ DAG Parse エラー / Memory 枯渇 CloudWatch Logs で scheduler error 確認 / Instance Class 上げる
Connection 取得失敗 Secrets Manager path 誤り / 暗号化キー権限なし aws secretsmanager get-secret-value でシークレット確認
Worker ログが出力されない Logging level 設定 / CloudWatch Logs 権限なし logging.logging_level=INFO で明示的に設定

2025-2026 最新動向

  • Apache Airflow 3.0 対応予定:Deferrable task・Dynamic Task Mapping の標準化
  • Kubernetes Executor 強化:EKS との統合強化で大規模 DAG サポート
  • Bedrock 統合:生成 AI をワークフローに組み込む能力
  • DataOps 統合:dbt・Great Expectations との Native 統合
  • Multi-Region MWAA:クロスリージョンフェイルオーバー対応予定
  • Cost Optimization:Spot Instances for Worker Fleet でコスト 40% 削減

学習リソース

公式リソース

OSS / コミュニティ

AWS ベンダーリソース


実装例・チェックリスト

MWAA 導入チェックリスト

  • [ ] Airflow DAG 開発経験がある
  • [ ] S3 バケットを作成・IAM Policy を設定
  • [ ] Execution Role に必要な IAM permissions を追加
  • [ ] VPC・Subnet・Security Group を構成
  • [ ] requirements.txt で依存パッケージを指定
  • [ ] DAG を airflow dags test でローカル検証
  • [ ] S3 に DAG / plugins.zip / requirements.txt をアップロード
  • [ ] MWAA Environment を CLI / CDK で作成
  • [ ] Airflow Web UI にアクセス(Public / Private Webserver 確認)
  • [ ] CloudWatch Logs で Scheduler ログを確認
  • [ ] DAG をトリガーして Task 実行確認
  • [ ] Slack / SNS アラートを設定
  • [ ] 本番 Environment に昇格

まとめ

Amazon MWAA は 「Apache Airflow のフルマネージドサービス」 で、複雑なデータパイプライン・ETL ワークフロー・機械学習オーケストレーションを Python DAG で自動化できます。S3 ベースのコード管理・AWS サービス統合・CloudWatch Logs 監視を提供し、Glue・Redshift・SageMaker・Lambda との連携に優れています。Instance Class を負荷に応じて選定し、Auto Scaling・Secrets Manager 統合・IAM 認証でセキュリティを確保することで、エンタープライズグレードのデータエンジニアリングプラットフォームを実現できます。

最終更新:2026-04-27 バージョン:v2.0