目次
AWS Data Pipeline 完全ガイド v2.0
レガシー ETL オーケストレーション・Glue/MWAA/Step Functions への移行推奨
概要
AWS Data Pipeline は 2011 年から提供される レガシー ETL(抽出・変換・ロード)オーケストレーションサービス です。S3・RDS・DynamoDB・Redshift・EMR 間のデータ移動・変換をスケジュール実行し、複雑な依存関係を管理するために設計されました。2024 年 7 月 25 日に新規顧客のアクセスが停止され、既存顧客のみメンテナンスモード(新機能なし)で利用継続可能です。現在の ETL パイプライン構築には AWS Glue ETL(コード ETL)・Amazon MWAA(Airflow)・AWS Step Functions(ワークフロー)が推奨されています。
初心者向けメモ
Data Pipeline は「昔のジョブスケジューラー」です。今なら Glue / MWAA / Step Functions を選びます。既存のパイプラインがある場合のみ参考にしてください。
サービス現状:レガシーステータス
AWS Data Pipeline 重要ステータス(2026年4月現在)
├─ 新規顧客:アクセス不可(2024-07-25 クローズ)
├─ 既存顧客:継続利用可、メンテナンスモードのみ
├─ 新機能:開発停止(最後の大規模更新: 2015年)
├─ サポート:セキュリティパッチ・可用性改善は継続
├─ 推奨移行先:AWS Glue / MWAA / Step Functions
└─ 官報:https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/
Data Pipeline が直面する制限:
| 制限項目 | 詳細 |
|---|---|
| スケジューラーの限定性 | cron 形式のみ対応・複雑な条件分岐が難しい |
| EC2 インスタンス依存 | Task Runner を実行する EC2 が必須・サーバーレス非対応 |
| リトライロジックの貧弱さ | 再試行回数・間隔が固定・指数バックオフなし |
| 監視・ロギング | CloudWatch 統合が限定的・ダッシュボードなし |
| 並行実行の管理 | 複数ジョブの同時実行制御が困難 |
| エラーハンドリング | アラート機能が基本的・カスタム例外処理難しい |
| スケーラビリティ | 大規模ワークロード対応が限定的 |
| コミュニティ・エコシステム | Apache Airflow 等に比べコミュニティが小さい |
AWS Data Pipeline の本質と構成要素
パイプライン定義(Pipeline Definition)
Data Pipeline のコア概念は JSON ベースの宣言的なパイプライン定義 です。各要素が依存関係を持ち、スケジュール管理される:
{
"objects": [
{
"id": "Default",
"type": "Default",
"failureAndRerunMode": "CASCADE",
"role": "DataPipelineDefaultRole",
"resourceRole": "DataPipelineDefaultRole"
},
{
"id": "ExportDynamoDBTask",
"type": "ExportDynamoDBToS3Activity",
"input": {
"ref": "DynamoDBTable"
},
"output": {
"ref": "S3OutputLocation"
},
"dependsOn": "ExportSchedule",
"runsOn": {
"ref": "EC2Instance"
}
},
{
"id": "ExportSchedule",
"type": "Schedule",
"period": "1 day",
"startDateTime": "2026-01-01T00:00:00",
"occurrences": "100"
},
{
"id": "DynamoDBTable",
"type": "DynamoDBDataNode",
"tableName": "MyProductTable",
"region": "us-east-1"
},
{
"id": "S3OutputLocation",
"type": "S3DataNode",
"directoryPath": "s3://my-bucket/exports/#{YYYY}-#{MM}-#{DD}"
},
{
"id": "EC2Instance",
"type": "Ec2Resource",
"instanceType": "t2.medium",
"imageId": "ami-12345678",
"keyPair": "MyKeyPair",
"subnetId": "subnet-12345678"
}
]
}
主要コンポーネント
| コンポーネント | 役割 | 例 |
|---|---|---|
| Data ノード | パイプラインが処理するデータの場所 | S3Datanode / DynamoDBDataNode / RdsDataNode |
| Activity | 実行される処理・変換 | CopyActivity / ShellCommandActivity / HiveActivity |
| Schedule | 実行スケジュール | 毎日・毎週・毎月などの頻度指定 |
| Resource | 処理を実行するコンピュートリソース | EC2Instance / EmrCluster |
| Precondition | Activity 実行前の条件チェック | DynamoDBExistsPrecondition / S3KeyExistsPrecondition |
Activity の種類
| Activity | 説明 |
|---|---|
| CopyActivity | S3 → Redshift, RDS → S3, DynamoDB → S3 等のデータコピー |
| ExportDynamoDBToS3Activity | DynamoDB テーブルを S3 に定期エクスポート |
| ImportDynamoDBFromS3Activity | S3 から DynamoDB へのインポート |
| HiveActivity | EMR 上で Hive クエリ実行 |
| ShellCommandActivity | 任意のシェルコマンド実行(EC2 上で実行) |
| RedshiftCopyActivity | Redshift への COPY コマンド実行 |
| SqlActivity | RDS 上で SQL 実行 |
典型的な Data Pipeline ワークフロー
パターン 1: 日次 DynamoDB → S3 エクスポート
Schedule (Daily 2:00 AM)
↓
ExportDynamoDBToS3Activity (EC2 Instance)
↓
S3 に JSON/CSV 出力
↓
Success Notification (SNS)
パイプライン定義:
{
"objects": [
{
"id": "DailyExportSchedule",
"type": "Schedule",
"period": "1 day",
"startDateTime": "2026-01-01T02:00:00"
},
{
"id": "ExportTask",
"type": "ExportDynamoDBToS3Activity",
"input": { "ref": "SourceTable" },
"output": { "ref": "S3Output" },
"runsOn": { "ref": "TaskRunner" },
"dependsOn": "DailyExportSchedule"
}
]
}
パターン 2: RDS → S3 → Redshift ETL パイプライン
Schedule (Weekly Sunday 3:00 AM)
↓
CopyActivity (RDS → S3, EC2)
↓
ShellCommandActivity (データ変換, EC2)
↓
CopyActivity (S3 → Redshift)
↓
Success / Failure Notification
パターン 3: EMR での複雑な処理
Schedule
↓
CopyActivity (S3 → S3, ステージング)
↓
HiveActivity (EMR クラスター上で集計)
↓
CopyActivity (処理結果 → Redshift)
↓
Cleanup (EMR クラスター削除)
Data Pipeline の実装例
Python による パイプライン定義生成
import json
import boto3
from datetime import datetime
client = boto3.client('datapipeline')
pipeline_def = {
"objects": [
{
"id": "Default",
"type": "Default",
"failureAndRerunMode": "CASCADE",
"role": "DataPipelineDefaultRole",
"resourceRole": "DataPipelineDefaultRole"
},
{
"id": "DailySchedule",
"type": "Schedule",
"period": "1 day",
"startDateTime": "2026-01-01T22:00:00",
"occurrences": "365"
},
{
"id": "S3Source",
"type": "S3DataNode",
"directoryPath": "s3://my-source-bucket/logs/#{YYYY}/#{MM}/#{DD}"
},
{
"id": "S3Destination",
"type": "S3DataNode",
"directoryPath": "s3://my-dest-bucket/processed/#{YYYY}-#{MM}-#{DD}"
},
{
"id": "ProcessingInstance",
"type": "Ec2Resource",
"instanceType": "t2.large",
"imageId": "ami-12345678",
"keyPair": "default-key"
},
{
"id": "TransformActivity",
"type": "ShellCommandActivity",
"command": "python /home/ec2-user/transform.py --input #{input1} --output #{output1}",
"input": { "ref": "S3Source" },
"output": { "ref": "S3Destination" },
"runsOn": { "ref": "ProcessingInstance" },
"dependsOn": "DailySchedule"
}
]
}
# パイプライン作成
response = client.create_pipeline(
name='my-data-pipeline',
uniqueId='pipeline-001'
)
pipeline_id = response['pipelineId']
# パイプライン定義を設定
client.put_pipeline_definition(
pipelineId=pipeline_id,
pipelineObjects=pipeline_def['objects']
)
# パイプライン有効化
client.activate_pipeline(pipelineId=pipeline_id)
print(f"Pipeline {pipeline_id} activated")
Task Runner(カスタム実装の例)
Data Pipeline は Task Runner プロセスがスケジュールを監視して Activity を実行:
#!/usr/bin/env python
import time
import subprocess
import json
from datetime import datetime
class DataPipelineTaskRunner:
def __init__(self, pipeline_id, region='us-east-1'):
self.pipeline_id = pipeline_id
self.region = region
self.client = boto3.client('datapipeline', region_name=region)
def poll_for_tasks(self, poll_interval=60):
"""スケジュール済みタスクをポーリング"""
while True:
try:
response = self.client.poll_for_task(
workerGroup='default',
hostname='instance-id'
)
if 'taskObject' in response:
task = response['taskObject']
task_id = response['taskId']
# Activity タイプに応じて実行
result = self.execute_task(task)
# 結果をパイプラインに報告
if result['success']:
self.client.set_task_status(
taskId=task_id,
taskStatus='FINISHED'
)
else:
self.client.set_task_status(
taskId=task_id,
taskStatus='FAILED',
errorId='TaskExecutionFailed',
errorMessage=result['error']
)
time.sleep(poll_interval)
except Exception as e:
print(f"Error polling tasks: {e}")
time.sleep(poll_interval)
def execute_task(self, task):
"""Activity を実行"""
task_type = task['type']
try:
if task_type == 'ShellCommandActivity':
return self.run_shell_command(task)
elif task_type == 'CopyActivity':
return self.run_copy_activity(task)
else:
return {'success': False, 'error': f'Unknown task type: {task_type}'}
except Exception as e:
return {'success': False, 'error': str(e)}
def run_shell_command(self, task):
"""シェルコマンド実行"""
command = task.get('command', '')
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
timeout=3600
)
if result.returncode == 0:
return {'success': True}
else:
return {'success': False, 'error': result.stderr.decode()}
except subprocess.TimeoutExpired:
return {'success': False, 'error': 'Task timeout'}
def run_copy_activity(self, task):
"""S3/RDS/DynamoDB 間のコピー実行"""
# AWS SDK を使用してデータコピー実装
pass
# Task Runner 起動
if __name__ == '__main__':
runner = DataPipelineTaskRunner('df-XXXXX')
runner.poll_for_tasks(poll_interval=30)
Data Pipeline から後継サービスへの移行比較
| サービス | 得意分野 | 移行難易度 | 学習曲線 |
|---|---|---|---|
| AWS Glue ETL | コード ETL・スケーラブル | 低~中 | Apache Spark 理解必須 |
| Amazon MWAA | ワークフロー・DAG 管理・柔軟性 | 低 | Apache Airflow 理解必須 |
| AWS Step Functions | サーバーレス・AWS サービス統合 | 低~中 | State Machine JSON 理解必須 |
| EventBridge Scheduler | 単純スケジュール実行 | 低 | cron 相当の理解で OK |
| AWS Lambda + CloudWatch Events | イベント駆動・リアルタイム | 低 | Lambda 開発スキル必須 |
Data Pipeline → 後継サービスへの移行マップ
移行パターン 1: シンプルな日次スケジュール実行
Data Pipeline:
{
"type": "Schedule",
"period": "1 day",
"startDateTime": "2026-01-01T22:00:00"
}
移行先: EventBridge Scheduler
# EventBridge Scheduler Rule
ScheduleExpression: cron(0 22 * * ? *) # 毎日 22:00 (UTC)
Target: Lambda Function / SNS Topic / SQS Queue
移行パターン 2: 複雑なワークフロー(依存関係多数)
Data Pipeline: Activity 群の複雑な依存関係
移行先: AWS Step Functions / Amazon MWAA
Step Functions State Machine:
{
"Comment": "Data Pipeline Migration Example",
"StartAt": "ExportDynamoDB",
"States": {
"ExportDynamoDB": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime",
"Parameters": {
"TableArn": "arn:aws:dynamodb:us-east-1:123456789012:table/MyTable",
"S3Bucket": "my-bucket",
"S3Prefix": "exports"
},
"Next": "ProcessData"
},
"ProcessData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ProcessDataFunction",
"Next": "CopyToRedshift"
},
"CopyToRedshift": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:redshift:executeStatement",
"Parameters": {
"ClusterIdentifier": "my-cluster",
"Database": "mydb",
"Sql": "COPY my_table FROM 's3://my-bucket/exports' IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'"
},
"End": true
}
}
}
Amazon MWAA DAG:
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'data-pipeline-migration',
default_args=default_args,
schedule_interval='0 22 * * *', # 毎日 22:00
catchup=False
) as dag:
export_task = AwsGlueJobOperator(
task_id='export_dynamodb',
job_name='ExportDynamoDBJob',
script_location='s3://my-bucket/glue-scripts/export.py'
)
process_task = AwsGlueJobOperator(
task_id='process_data',
job_name='ProcessDataJob'
)
copy_task = RedshiftSQLOperator(
task_id='copy_to_redshift',
sql='COPY my_table FROM ...',
redshift_conn_id='redshift_default'
)
export_task >> process_task >> copy_task
移行パターン 3: EC2 上でのシェルスクリプト実行
Data Pipeline:
{
"type": "ShellCommandActivity",
"command": "bash /home/ec2-user/process.sh --input #{input1}",
"runsOn": { "ref": "EC2Instance" }
}
移行先: AWS Glue Python Shell Job / Lambda / ECS Task
AWS Glue PySpark Job:
# Glue Job として実行
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
args = getResolvedOptions(sys.argv, ['TempDir', 'JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
# ETL ロジック
df = glueContext.create_dynamic_frame.from_options(
format_options={'paths': ['s3://my-bucket/input/']},
connection_type='s3',
format='json'
)
# 変換処理
transformed_df = df.select_fields(['id', 'name', 'value'])
# 出力
glueContext.write_dynamic_frame.from_options(
frame=transformed_df,
connection_type='s3',
connection_options={'path': 's3://my-bucket/output/'},
format='json'
)
Data Pipeline のコスト比較
| サービス | 課金方法 | 月額コスト例(100 回/月実行) |
|---|---|---|
| Data Pipeline | アクティビティ数 × 実行頻度 | $60/月(低頻度) |
| AWS Glue ETL | DPU-時間 | $1.5〜3/時間 × 実行時間 |
| Amazon MWAA | 管理コスト + 実行コスト | $300/月(最小) + DPU |
| AWS Step Functions | ステップ実行回数 | 100 万ステップまで無料 |
| Lambda + EventBridge | Lambda 実行 + イベント | 100 万イベント + 実行時間秒単位 |
推奨判断:
- 小規模・低頻度 → Lambda + EventBridge
- 大規模 ETL・複雑変換 → AWS Glue
- 複雑ワークフロー → Step Functions / MWAA
- 単純スケジュール → EventBridge Scheduler
Data Pipeline 移行チェックリスト
Step 1: 現状分析
├─ 既存パイプライン数・実行頻度・ドキュメント作成
├─ 実行時間・エラー率・依存関係の把握
└─ EC2 インスタンス設定・IAM ロール確認
Step 2: 移行先選定
├─ パイプラインの複雑度に応じて後継サービス決定
└─ 新旧サービスの並行運用期間設定(3〜6 ヶ月推奨)
Step 3: 移行実装
├─ 後継サービス(Glue / MWAA / Step Functions)の開発
├─ テストデータでの機能検証
└─ パフォーマンステスト・ロードテスト
Step 4: ステージング検証
├─ ステージング環境で 1 ヶ月の並行実行
├─ 結果の相互比較・差分検査
└─ 本番と同じスケールでの検証
Step 5: 本番切り替え
├─ データバックアップ・ロールバック計画確認
├─ トラフィック段階的転送(10% → 50% → 100%)
└─ 監視・アラート設定
Step 6: レガシー削除
├─ Data Pipeline パイプライン停止・削除
└─ EC2 インスタンス・IAM ロール削除
Data Pipeline の代替サービス詳細比較
AWS Glue ETL
特徴:
- PySpark / Scala での ETL コーディング
- 自動スケーリング DPU モデル
- Glue Catalog でメタデータ一元管理
- 関連サービス(Athena / Redshift / RDS)との統合
向く用途:
- 大規模データの抽出・変換・ロード
- Spark SQL での複雑な集計
- データフォーマット変換(Parquet / Orc / CSV)
学習:
# Glue Job 例
import sys
from awsglue.transforms import *
# S3 からデータ読み込み
dyf = glueContext.create_dynamic_frame_from_options(
's3', {'paths': ['s3://bucket/input/']}
)
# Spark DataFrame に変換
df = dyf.toDF()
# SQL クエリ実行
result = df.groupby('category').agg({'sales': 'sum'})
# S3 に出力
result.write.parquet('s3://bucket/output/')
Amazon MWAA(Managed Workflows for Apache Airflow)
特徴:
- Apache Airflow による DAG ベースのワークフロー
- Python コーディングで柔軟性
- 豊富なオペレーター(Glue / Lambda / EMR / Redshift)
- ダッシュボード・ログ可視化
向く用途:
- 複雑なタスク依存関係管理
- 複数 AWS サービスの統合
- エラーハンドリング・リトライ戦略
学習:
# MWAA DAG 例
from airflow import DAG
from datetime import datetime
with DAG(
'complex_etl',
start_date=datetime(2026, 1, 1),
schedule_interval='0 2 * * *'
) as dag:
# タスク定義
task1 = ...
task2 = ...
# 依存関係
task1 >> task2
AWS Step Functions
特徴:
- サーバーレス・ワークフロー
- State Machine JSON で定義
- AWS サービスネイティブ統合
- ビジュアルワークフローエディター
向く用途:
- Lambda / ECS / Batch の調整
- 分岐・並列処理
- AWS API シーケンス実行
まとめ
| 項目 | 詳細 |
|---|---|
| Data Pipeline の立場 | AWS のレガシー ETL スケジューラー。2024 年 7 月に新規顧客のアクセスがクローズされ、既存顧客のみメンテナンスモードで利用可能。新機能開発は 2015 年以来停止。 |
| 推奨判断 | 既存パイプライン以外は後継サービスを選択。シンプル → EventBridge/Lambda、複雑 → Glue/MWAA/Step Functions を選定。 |
| 移行タイムライン | 複雑度に応じて 3〜12 ヶ月。小規模パイプラインなら 3 ヶ月、エンタープライズスケールなら 12 ヶ月を推奨。 |
| コスト | Glue / MWAA は使用量ベース課金で、遊休コストなし。Step Functions は実行回数ベースで低コスト。 |
| サポート | AWS はセキュリティパッチ継続するが新機能なし。後継サービスは活発な開発・機能追加が継続中。 |