目次

AWS Data Pipeline 完全ガイド v2.0

レガシー ETL オーケストレーション・Glue/MWAA/Step Functions への移行推奨


概要

AWS Data Pipeline は 2011 年から提供される レガシー ETL(抽出・変換・ロード)オーケストレーションサービス です。S3・RDS・DynamoDB・Redshift・EMR 間のデータ移動・変換をスケジュール実行し、複雑な依存関係を管理するために設計されました。2024 年 7 月 25 日に新規顧客のアクセスが停止され、既存顧客のみメンテナンスモード(新機能なし)で利用継続可能です。現在の ETL パイプライン構築には AWS Glue ETL(コード ETL)・Amazon MWAA(Airflow)・AWS Step Functions(ワークフロー)が推奨されています。

初心者向けメモ

Data Pipeline は「昔のジョブスケジューラー」です。今なら Glue / MWAA / Step Functions を選びます。既存のパイプラインがある場合のみ参考にしてください。


サービス現状:レガシーステータス

AWS Data Pipeline 重要ステータス(2026年4月現在)
├─ 新規顧客:アクセス不可(2024-07-25 クローズ)
├─ 既存顧客:継続利用可、メンテナンスモードのみ
├─ 新機能:開発停止(最後の大規模更新: 2015年)
├─ サポート:セキュリティパッチ・可用性改善は継続
├─ 推奨移行先:AWS Glue / MWAA / Step Functions
└─ 官報:https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/

Data Pipeline が直面する制限:

制限項目 詳細
スケジューラーの限定性 cron 形式のみ対応・複雑な条件分岐が難しい
EC2 インスタンス依存 Task Runner を実行する EC2 が必須・サーバーレス非対応
リトライロジックの貧弱さ 再試行回数・間隔が固定・指数バックオフなし
監視・ロギング CloudWatch 統合が限定的・ダッシュボードなし
並行実行の管理 複数ジョブの同時実行制御が困難
エラーハンドリング アラート機能が基本的・カスタム例外処理難しい
スケーラビリティ 大規模ワークロード対応が限定的
コミュニティ・エコシステム Apache Airflow 等に比べコミュニティが小さい

AWS Data Pipeline の本質と構成要素

パイプライン定義(Pipeline Definition)

Data Pipeline のコア概念は JSON ベースの宣言的なパイプライン定義 です。各要素が依存関係を持ち、スケジュール管理される:

{
  "objects": [
    {
      "id": "Default",
      "type": "Default",
      "failureAndRerunMode": "CASCADE",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultRole"
    },
    {
      "id": "ExportDynamoDBTask",
      "type": "ExportDynamoDBToS3Activity",
      "input": {
        "ref": "DynamoDBTable"
      },
      "output": {
        "ref": "S3OutputLocation"
      },
      "dependsOn": "ExportSchedule",
      "runsOn": {
        "ref": "EC2Instance"
      }
    },
    {
      "id": "ExportSchedule",
      "type": "Schedule",
      "period": "1 day",
      "startDateTime": "2026-01-01T00:00:00",
      "occurrences": "100"
    },
    {
      "id": "DynamoDBTable",
      "type": "DynamoDBDataNode",
      "tableName": "MyProductTable",
      "region": "us-east-1"
    },
    {
      "id": "S3OutputLocation",
      "type": "S3DataNode",
      "directoryPath": "s3://my-bucket/exports/#{YYYY}-#{MM}-#{DD}"
    },
    {
      "id": "EC2Instance",
      "type": "Ec2Resource",
      "instanceType": "t2.medium",
      "imageId": "ami-12345678",
      "keyPair": "MyKeyPair",
      "subnetId": "subnet-12345678"
    }
  ]
}

主要コンポーネント

コンポーネント 役割
Data ノード パイプラインが処理するデータの場所 S3Datanode / DynamoDBDataNode / RdsDataNode
Activity 実行される処理・変換 CopyActivity / ShellCommandActivity / HiveActivity
Schedule 実行スケジュール 毎日・毎週・毎月などの頻度指定
Resource 処理を実行するコンピュートリソース EC2Instance / EmrCluster
Precondition Activity 実行前の条件チェック DynamoDBExistsPrecondition / S3KeyExistsPrecondition

Activity の種類

Activity 説明
CopyActivity S3 → Redshift, RDS → S3, DynamoDB → S3 等のデータコピー
ExportDynamoDBToS3Activity DynamoDB テーブルを S3 に定期エクスポート
ImportDynamoDBFromS3Activity S3 から DynamoDB へのインポート
HiveActivity EMR 上で Hive クエリ実行
ShellCommandActivity 任意のシェルコマンド実行(EC2 上で実行)
RedshiftCopyActivity Redshift への COPY コマンド実行
SqlActivity RDS 上で SQL 実行

典型的な Data Pipeline ワークフロー

パターン 1: 日次 DynamoDB → S3 エクスポート

Schedule (Daily 2:00 AM)
    ↓
ExportDynamoDBToS3Activity (EC2 Instance)
    ↓
S3 に JSON/CSV 出力
    ↓
Success Notification (SNS)

パイプライン定義:

{
  "objects": [
    {
      "id": "DailyExportSchedule",
      "type": "Schedule",
      "period": "1 day",
      "startDateTime": "2026-01-01T02:00:00"
    },
    {
      "id": "ExportTask",
      "type": "ExportDynamoDBToS3Activity",
      "input": { "ref": "SourceTable" },
      "output": { "ref": "S3Output" },
      "runsOn": { "ref": "TaskRunner" },
      "dependsOn": "DailyExportSchedule"
    }
  ]
}

パターン 2: RDS → S3 → Redshift ETL パイプライン

Schedule (Weekly Sunday 3:00 AM)
    ↓
CopyActivity (RDS → S3, EC2)
    ↓
ShellCommandActivity (データ変換, EC2)
    ↓
CopyActivity (S3 → Redshift)
    ↓
Success / Failure Notification

パターン 3: EMR での複雑な処理

Schedule
    ↓
CopyActivity (S3 → S3, ステージング)
    ↓
HiveActivity (EMR クラスター上で集計)
    ↓
CopyActivity (処理結果 → Redshift)
    ↓
Cleanup (EMR クラスター削除)

Data Pipeline の実装例

Python による パイプライン定義生成

import json
import boto3
from datetime import datetime

client = boto3.client('datapipeline')

pipeline_def = {
    "objects": [
        {
            "id": "Default",
            "type": "Default",
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultRole"
        },
        {
            "id": "DailySchedule",
            "type": "Schedule",
            "period": "1 day",
            "startDateTime": "2026-01-01T22:00:00",
            "occurrences": "365"
        },
        {
            "id": "S3Source",
            "type": "S3DataNode",
            "directoryPath": "s3://my-source-bucket/logs/#{YYYY}/#{MM}/#{DD}"
        },
        {
            "id": "S3Destination",
            "type": "S3DataNode",
            "directoryPath": "s3://my-dest-bucket/processed/#{YYYY}-#{MM}-#{DD}"
        },
        {
            "id": "ProcessingInstance",
            "type": "Ec2Resource",
            "instanceType": "t2.large",
            "imageId": "ami-12345678",
            "keyPair": "default-key"
        },
        {
            "id": "TransformActivity",
            "type": "ShellCommandActivity",
            "command": "python /home/ec2-user/transform.py --input #{input1} --output #{output1}",
            "input": { "ref": "S3Source" },
            "output": { "ref": "S3Destination" },
            "runsOn": { "ref": "ProcessingInstance" },
            "dependsOn": "DailySchedule"
        }
    ]
}

# パイプライン作成
response = client.create_pipeline(
    name='my-data-pipeline',
    uniqueId='pipeline-001'
)

pipeline_id = response['pipelineId']

# パイプライン定義を設定
client.put_pipeline_definition(
    pipelineId=pipeline_id,
    pipelineObjects=pipeline_def['objects']
)

# パイプライン有効化
client.activate_pipeline(pipelineId=pipeline_id)

print(f"Pipeline {pipeline_id} activated")

Task Runner(カスタム実装の例)

Data Pipeline は Task Runner プロセスがスケジュールを監視して Activity を実行:

#!/usr/bin/env python
import time
import subprocess
import json
from datetime import datetime

class DataPipelineTaskRunner:
    def __init__(self, pipeline_id, region='us-east-1'):
        self.pipeline_id = pipeline_id
        self.region = region
        self.client = boto3.client('datapipeline', region_name=region)
    
    def poll_for_tasks(self, poll_interval=60):
        """スケジュール済みタスクをポーリング"""
        while True:
            try:
                response = self.client.poll_for_task(
                    workerGroup='default',
                    hostname='instance-id'
                )
                
                if 'taskObject' in response:
                    task = response['taskObject']
                    task_id = response['taskId']
                    
                    # Activity タイプに応じて実行
                    result = self.execute_task(task)
                    
                    # 結果をパイプラインに報告
                    if result['success']:
                        self.client.set_task_status(
                            taskId=task_id,
                            taskStatus='FINISHED'
                        )
                    else:
                        self.client.set_task_status(
                            taskId=task_id,
                            taskStatus='FAILED',
                            errorId='TaskExecutionFailed',
                            errorMessage=result['error']
                        )
                
                time.sleep(poll_interval)
            except Exception as e:
                print(f"Error polling tasks: {e}")
                time.sleep(poll_interval)
    
    def execute_task(self, task):
        """Activity を実行"""
        task_type = task['type']
        
        try:
            if task_type == 'ShellCommandActivity':
                return self.run_shell_command(task)
            elif task_type == 'CopyActivity':
                return self.run_copy_activity(task)
            else:
                return {'success': False, 'error': f'Unknown task type: {task_type}'}
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def run_shell_command(self, task):
        """シェルコマンド実行"""
        command = task.get('command', '')
        try:
            result = subprocess.run(
                command,
                shell=True,
                capture_output=True,
                timeout=3600
            )
            if result.returncode == 0:
                return {'success': True}
            else:
                return {'success': False, 'error': result.stderr.decode()}
        except subprocess.TimeoutExpired:
            return {'success': False, 'error': 'Task timeout'}
    
    def run_copy_activity(self, task):
        """S3/RDS/DynamoDB 間のコピー実行"""
        # AWS SDK を使用してデータコピー実装
        pass

# Task Runner 起動
if __name__ == '__main__':
    runner = DataPipelineTaskRunner('df-XXXXX')
    runner.poll_for_tasks(poll_interval=30)

Data Pipeline から後継サービスへの移行比較

サービス 得意分野 移行難易度 学習曲線
AWS Glue ETL コード ETL・スケーラブル 低~中 Apache Spark 理解必須
Amazon MWAA ワークフロー・DAG 管理・柔軟性 Apache Airflow 理解必須
AWS Step Functions サーバーレス・AWS サービス統合 低~中 State Machine JSON 理解必須
EventBridge Scheduler 単純スケジュール実行 cron 相当の理解で OK
AWS Lambda + CloudWatch Events イベント駆動・リアルタイム Lambda 開発スキル必須

Data Pipeline → 後継サービスへの移行マップ

移行パターン 1: シンプルな日次スケジュール実行

Data Pipeline:

{
  "type": "Schedule",
  "period": "1 day",
  "startDateTime": "2026-01-01T22:00:00"
}

移行先: EventBridge Scheduler

# EventBridge Scheduler Rule
ScheduleExpression: cron(0 22 * * ? *)  # 毎日 22:00 (UTC)
Target: Lambda Function / SNS Topic / SQS Queue

移行パターン 2: 複雑なワークフロー(依存関係多数)

Data Pipeline: Activity 群の複雑な依存関係

移行先: AWS Step Functions / Amazon MWAA

Step Functions State Machine:

{
  "Comment": "Data Pipeline Migration Example",
  "StartAt": "ExportDynamoDB",
  "States": {
    "ExportDynamoDB": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime",
      "Parameters": {
        "TableArn": "arn:aws:dynamodb:us-east-1:123456789012:table/MyTable",
        "S3Bucket": "my-bucket",
        "S3Prefix": "exports"
      },
      "Next": "ProcessData"
    },
    "ProcessData": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ProcessDataFunction",
      "Next": "CopyToRedshift"
    },
    "CopyToRedshift": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:redshift:executeStatement",
      "Parameters": {
        "ClusterIdentifier": "my-cluster",
        "Database": "mydb",
        "Sql": "COPY my_table FROM 's3://my-bucket/exports' IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'"
      },
      "End": true
    }
  }
}

Amazon MWAA DAG:

from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'data-pipeline-migration',
    default_args=default_args,
    schedule_interval='0 22 * * *',  # 毎日 22:00
    catchup=False
) as dag:
    
    export_task = AwsGlueJobOperator(
        task_id='export_dynamodb',
        job_name='ExportDynamoDBJob',
        script_location='s3://my-bucket/glue-scripts/export.py'
    )
    
    process_task = AwsGlueJobOperator(
        task_id='process_data',
        job_name='ProcessDataJob'
    )
    
    copy_task = RedshiftSQLOperator(
        task_id='copy_to_redshift',
        sql='COPY my_table FROM ...',
        redshift_conn_id='redshift_default'
    )
    
    export_task >> process_task >> copy_task

移行パターン 3: EC2 上でのシェルスクリプト実行

Data Pipeline:

{
  "type": "ShellCommandActivity",
  "command": "bash /home/ec2-user/process.sh --input #{input1}",
  "runsOn": { "ref": "EC2Instance" }
}

移行先: AWS Glue Python Shell Job / Lambda / ECS Task

AWS Glue PySpark Job:

# Glue Job として実行
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

args = getResolvedOptions(sys.argv, ['TempDir', 'JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)

# ETL ロジック
df = glueContext.create_dynamic_frame.from_options(
    format_options={'paths': ['s3://my-bucket/input/']},
    connection_type='s3',
    format='json'
)

# 変換処理
transformed_df = df.select_fields(['id', 'name', 'value'])

# 出力
glueContext.write_dynamic_frame.from_options(
    frame=transformed_df,
    connection_type='s3',
    connection_options={'path': 's3://my-bucket/output/'},
    format='json'
)

Data Pipeline のコスト比較

サービス 課金方法 月額コスト例(100 回/月実行)
Data Pipeline アクティビティ数 × 実行頻度 $60/月(低頻度)
AWS Glue ETL DPU-時間 $1.5〜3/時間 × 実行時間
Amazon MWAA 管理コスト + 実行コスト $300/月(最小) + DPU
AWS Step Functions ステップ実行回数 100 万ステップまで無料
Lambda + EventBridge Lambda 実行 + イベント 100 万イベント + 実行時間秒単位

推奨判断:

  • 小規模・低頻度 → Lambda + EventBridge
  • 大規模 ETL・複雑変換 → AWS Glue
  • 複雑ワークフロー → Step Functions / MWAA
  • 単純スケジュール → EventBridge Scheduler

Data Pipeline 移行チェックリスト

Step 1: 現状分析
├─ 既存パイプライン数・実行頻度・ドキュメント作成
├─ 実行時間・エラー率・依存関係の把握
└─ EC2 インスタンス設定・IAM ロール確認

Step 2: 移行先選定
├─ パイプラインの複雑度に応じて後継サービス決定
└─ 新旧サービスの並行運用期間設定(3〜6 ヶ月推奨)

Step 3: 移行実装
├─ 後継サービス(Glue / MWAA / Step Functions)の開発
├─ テストデータでの機能検証
└─ パフォーマンステスト・ロードテスト

Step 4: ステージング検証
├─ ステージング環境で 1 ヶ月の並行実行
├─ 結果の相互比較・差分検査
└─ 本番と同じスケールでの検証

Step 5: 本番切り替え
├─ データバックアップ・ロールバック計画確認
├─ トラフィック段階的転送(10% → 50% → 100%)
└─ 監視・アラート設定

Step 6: レガシー削除
├─ Data Pipeline パイプライン停止・削除
└─ EC2 インスタンス・IAM ロール削除

Data Pipeline の代替サービス詳細比較

AWS Glue ETL

特徴:

  • PySpark / Scala での ETL コーディング
  • 自動スケーリング DPU モデル
  • Glue Catalog でメタデータ一元管理
  • 関連サービス(Athena / Redshift / RDS)との統合

向く用途:

  • 大規模データの抽出・変換・ロード
  • Spark SQL での複雑な集計
  • データフォーマット変換(Parquet / Orc / CSV)

学習:

# Glue Job 例
import sys
from awsglue.transforms import *

# S3 からデータ読み込み
dyf = glueContext.create_dynamic_frame_from_options(
    's3', {'paths': ['s3://bucket/input/']}
)

# Spark DataFrame に変換
df = dyf.toDF()

# SQL クエリ実行
result = df.groupby('category').agg({'sales': 'sum'})

# S3 に出力
result.write.parquet('s3://bucket/output/')

Amazon MWAA(Managed Workflows for Apache Airflow)

特徴:

  • Apache Airflow による DAG ベースのワークフロー
  • Python コーディングで柔軟性
  • 豊富なオペレーター(Glue / Lambda / EMR / Redshift)
  • ダッシュボード・ログ可視化

向く用途:

  • 複雑なタスク依存関係管理
  • 複数 AWS サービスの統合
  • エラーハンドリング・リトライ戦略

学習:

# MWAA DAG 例
from airflow import DAG
from datetime import datetime

with DAG(
    'complex_etl',
    start_date=datetime(2026, 1, 1),
    schedule_interval='0 2 * * *'
) as dag:
    # タスク定義
    task1 = ...
    task2 = ...
    # 依存関係
    task1 >> task2

AWS Step Functions

特徴:

  • サーバーレス・ワークフロー
  • State Machine JSON で定義
  • AWS サービスネイティブ統合
  • ビジュアルワークフローエディター

向く用途:

  • Lambda / ECS / Batch の調整
  • 分岐・並列処理
  • AWS API シーケンス実行

まとめ

項目 詳細
Data Pipeline の立場 AWS のレガシー ETL スケジューラー。2024 年 7 月に新規顧客のアクセスがクローズされ、既存顧客のみメンテナンスモードで利用可能。新機能開発は 2015 年以来停止。
推奨判断 既存パイプライン以外は後継サービスを選択。シンプル → EventBridge/Lambda、複雑 → Glue/MWAA/Step Functions を選定。
移行タイムライン 複雑度に応じて 3〜12 ヶ月。小規模パイプラインなら 3 ヶ月、エンタープライズスケールなら 12 ヶ月を推奨。
コスト Glue / MWAA は使用量ベース課金で、遊休コストなし。Step Functions は実行回数ベースで低コスト。
サポート AWS はセキュリティパッチ継続するが新機能なし。後継サービスは活発な開発・機能追加が継続中。

参考リンク