目次

Amazon SWF(Simple Workflow Service)v2.0 完全ガイド

公式ドキュメント


1. 本質と位置付け

Amazon SWF(Simple Workflow Service)は 「分散アプリケーションの長期実行ワークフロー・タスク調整を管理するレガシーサービス」 である。Decider(ワークフロー制御ロジック)と Activity Worker(タスク実行)を完全に分離し、ポーリングベースのタスクキューイング・状態管理を提供する。

重要な文脈:レガシーサービス化

  • 2024-2026 現状:AWS は新規プロジェクトに AWS Step Functions を推奨
  • SWF の位置付け:既存ワークフローの継続運用・レガシーシステム統合用
  • 廃止予定:公式には発表されていないが、イノベーションは Step Functions に集約
  • 支援状況:AWS サポート継続、但しサービス強化はない

このサービスを選ぶ理由

なぜ Amazon SWF でないといけないのか?

  1. 既存レガシーシステムの継続運用

    • SWF は 2012 年より Amazon の内部システム(注文管理・在庫処理・出荷ワークフロー)で稼働
    • 数千万行の SWF ベースのワークフローを Step Functions に移行するコスト > 継続利用コスト
    • 金融機関・保険会社等の堅牢性重視の組織ではまだ採用継続中
  2. カスタム Decider ロジックの複雑性対応

    • Step Functions のペイロード制限(最大 32KB JSON)を超える大規模データ処理
    • 条件分岐数 1000+ のような極めて複雑なワークフロー(プログラミング言語での実装が効率的)
    • 事前定義されない動的な意思決定が必要なユースケース
  3. ワーカーとの疎結合・言語非依存性

    • Activity Worker は任意言語(Java / Python / Node.js / Go)・任意場所(EC2 / Lambda / オンプレ)で動作
    • レガシーシステム(メインフレーム・COBOL)との統合が容易
    • 既存インフラと AWS の混在運用
  4. タスクリストの柔軟な管理

    • SQS 的なタスクキューイング + 状態管理の統合
    • 分散ワーカーのスケール・フェイルオーバー管理が単純
    • 通知ベースでなくポーリングベース(予測可能な処理能力)

このサービスを選ばない理由

  • ✅ 新規プロジェクト → AWS Step Functions を利用
  • ✅ 短期プロジェクト(運用期間 < 3年)→ Step Functions が低コスト
  • ✅ サーバーレス前提の組織 → Step Functions が標準
  • ✅ ビジュアルワークフロー設計が必須 → Step Functions のデザイナーを利用

2. SWF vs Step Functions vs Apache Airflow vs Temporal vs Cadence

┌────────────────┬─────────────────┬────────────────┬─────────────────┬──────────────┐
│ 比較軸         │ AWS SWF         │ Step Functions  │ Apache Airflow   │ Temporal     │
├────────────────┼─────────────────┼────────────────┼─────────────────┼──────────────┤
│ 実行モデル     │ ポーリングベース │ イベント駆動   │ DAG スケジューラ │ Event sourcing │
│ 管理形態       │ 完全マネージド  │ 完全マネージド  │ Self-managed    │ Self/Managed │
│ コード開発     │ SDK(複雑)    │ JSON(簡単)     │ Python(中)      │ SDK(複雑)    │
│ 学習曲線       │ 急(陡)          │ 緩(緩い)       │ 中程度           │ 陡い         │
│ ペイロード制限 │ 無制限          │ 32KB JSON      │ 無制限          │ 無制限       │
│ 実行時間       │ 1年            │ 1年             │ 無制限          │ 無制限       │
│ 視覚化         │ なし            │ ビジュアル設計  │ Airflow UI      │ なし        │
│ AWS 統合       │ 限定的          │ 60+ サービス    │ なし             │ なし        │
│ コスト感       │ 低(古い価格)    │ 低~中          │ インフラ依存     │ 自己責任    │
│ 推奨対象       │ レガシー保守    │ 新規 AWS ネイティブ │ 大規模オンプレ │ クリティカル │
└────────────────┴─────────────────┴────────────────┴─────────────────┴──────────────┘

詳細比較

比較項目 SWF Step Functions Apache Airflow Temporal Cadence
マネージド度 フル(インフラ不要) フル 自管理 部分(自管理ランタイム) 部分(自管理ランタイム)
複雑な分岐対応 ✅ 優秀(プログラミング言語) △ 限定的 △ 限定的 ✅ 優秀 ✅ 優秀
長期実行 ✅ 最大 1年 ✅ 最大 1年 ✅ 無制限 ✅ 無制限 ✅ 無制限
ペイロード大 ✅ 無制限(S3 参照) △ 32KB ✅ 無制限 ✅ 無制限 ✅ 無制限
オンプレ統合 ✅ 容易 △ 困難 ✅ 容易 △ 困難 △ 困難
DevOps ガバナンス △ CloudTrail 監査 ✅ 完全対応 △ 限定的 △ 限定的 △ 限定的
コミュニティ 小(衰退中) 大(急成長) 非常に大(活発) 中(急成長)
デバッグ/トレース △ CloudWatch ✅ ビジュアルデバッガ ✅ Airflow UI ✅ WebUI ✅ WebUI

3. SWF アーキテクチャ・コアコンセプト

3.1 ドメイン(Domain)

ワークフロー・アクティビティの論理的なネームスペース。AWS アカウント・リージョン単位で隔離。

# ドメインの作成と管理
aws swf register-domain \
  --name production-workflow \
  --workflow-execution-retention-period-in-days 90 \
  --region ap-northeast-1

# ドメイン一覧確認
aws swf list-domains --registration-status REGISTERED

3.2 ワークフロータイプ・バージョン

ワークフローの定義(メタデータ)。変更不可で、新規バージョンでデプロイ。

aws swf register-workflow-type \
  --domain production-workflow \
  --name order-processing-workflow \
  --workflow-version "2.0" \
  --default-task-list '{"name": "DecisionTaskList"}' \
  --default-execution-start-to-close-timeout "3600" \
  --default-task-start-to-close-timeout "60"

3.3 アクティビティタイプ・バージョン

実際の処理(タスク)の定義。Decider が必要に応じてスケジュール。

aws swf register-activity-type \
  --domain production-workflow \
  --name process-payment \
  --activity-version "1.0" \
  --default-task-start-to-close-timeout "600" \
  --default-task-heartbeat-timeout "120" \
  --default-task-schedule-to-start-timeout "60" \
  --default-task-schedule-to-close-timeout "660"

3.4 Decider(ディサイダー)

ワークフロー実行ロジック。長時間ポーリングして Decision Task を受け取り、次のアクション(Activity Task スケジュール・分岐判定・終了)を決定。

import boto3
import json

swf_client = boto3.client('swf', region_name='ap-northeast-1')

def run_decider():
    while True:
        # Decision Task をポーリング(タイムアウト 60秒)
        response = swf_client.poll_for_decision_task(
            domain='production-workflow',
            taskList={'name': 'DecisionTaskList'},
            identity='Decider-001'
        )
        
        if 'taskToken' not in response:
            continue  # タイムアウト、再ポーリング
        
        task_token = response['taskToken']
        events = response['events']
        
        # イベント履歴から現在の状態を判定
        decisions = make_decisions(events)
        
        # Decision を送信
        swf_client.respond_decision_task_completed(
            taskToken=task_token,
            decisions=decisions
        )

def make_decisions(events):
    """イベント履歴から次の Decision を生成"""
    decisions = []
    
    for event in events:
        if event['eventType'] == 'WorkflowExecutionStarted':
            # ワークフロー開始 → 最初のアクティビティスケジュール
            decisions.append({
                'decisionType': 'ScheduleActivityTask',
                'scheduleActivityTaskDecisionAttributes': {
                    'activityType': {'name': 'validate-order', 'version': '1.0'},
                    'activityId': f'activity-{time.time()}',
                    'taskList': {'name': 'ActivityTaskList'},
                    'scheduleToCloseTimeout': '600',
                    'startToCloseTimeout': '300'
                }
            })
        
        elif event['eventType'] == 'ActivityTaskCompleted':
            result = json.loads(event['activityTaskCompletedEventAttributes']['result'])
            if result['status'] == 'valid':
                # 次のアクティビティ
                decisions.append({
                    'decisionType': 'ScheduleActivityTask',
                    'scheduleActivityTaskDecisionAttributes': {
                        'activityType': {'name': 'process-payment', 'version': '1.0'},
                        'activityId': f'payment-{time.time()}',
                        'taskList': {'name': 'ActivityTaskList'},
                        'scheduleToCloseTimeout': '600'
                    }
                })
            else:
                # キャンセル
                decisions.append({
                    'decisionType': 'CancelWorkflowExecution',
                    'cancelWorkflowExecutionDecisionAttributes': {
                        'details': 'Order validation failed'
                    }
                })
        
        elif event['eventType'] == 'ActivityTaskCompleted' and \
             event['activityTaskCompletedEventAttributes']['activityType']['name'] == 'process-payment':
            # ワークフロー完了
            decisions.append({
                'decisionType': 'CompleteWorkflowExecution',
                'completeWorkflowExecutionDecisionAttributes': {
                    'result': 'Order processed successfully'
                }
            })
    
    return decisions

3.5 Activity Worker(アクティビティワーカー)

実際の処理を実行するワーカープロセス。Task List をポーリングして Activity Task を取得、処理実行、結果報告。

def run_activity_worker():
    while True:
        # Activity Task をポーリング
        response = swf_client.poll_for_activity_task(
            domain='production-workflow',
            taskList={'name': 'ActivityTaskList'},
            identity='Worker-001'
        )
        
        if 'taskToken' not in response:
            continue
        
        task_token = response['taskToken']
        activity_type = response['activityType']['name']
        input_data = json.loads(response.get('input', '{}'))
        
        try:
            if activity_type == 'validate-order':
                result = validate_order(input_data)
            elif activity_type == 'process-payment':
                result = process_payment(input_data)
            else:
                raise ValueError(f'Unknown activity: {activity_type}')
            
            # 成功を報告
            swf_client.respond_activity_task_completed(
                taskToken=task_token,
                result=json.dumps(result)
            )
        
        except Exception as e:
            # 失敗を報告
            swf_client.respond_activity_task_failed(
                taskToken=task_token,
                reason=str(e),
                details=traceback.format_exc()
            )
        
        # オプション:Heartbeat(長時間処理の場合)
        def long_running_task():
            for step in range(100):
                try:
                    swf_client.record_activity_task_heartbeat(
                        taskToken=task_token,
                        details=f'Processing step {step}/100'
                    )
                except swf_client.exceptions.TaskTimedOutFault:
                    # Heartbeat タイムアウト、タスク中止
                    return
                time.sleep(1)

3.6 Task List(タスクリスト)

Decider / Worker が購読する仮想キュー。SQS と異なり、FIFO 順序保証・複数ワーカーでの負荷分散を実現。

Decider ポーリング
    ↓
Decision Task List(DecisionTaskList)
    ↑
SWF Scheduler
    ↓
Activity Task List(ActivityTaskList)
    ↑
Worker ポーリング

4. ワークフロー実行・ライフサイクル

4.1 ワークフロー実行の開始

# ワークフロー実行の起動
aws swf start-workflow-execution \
  --domain production-workflow \
  --workflow-id "order-12345-$(date +%s)" \
  --workflow-type '{"name": "order-processing-workflow", "version": "2.0"}' \
  --task-list '{"name": "DecisionTaskList"}' \
  --input '{"orderId": "ORDER-12345", "customerId": "CUST-789", "amount": 15000, "items": ["SKU-001", "SKU-002"]}' \
  --execution-start-to-close-timeout "3600" \
  --tag-list "production" "order-service" \
  --region ap-northeast-1

# 出力例:
# {
#   "runId": "f2e46fb0-1e7e-11ed-be35-0242ac130002"
# }

4.2 ワークフロー実行の状態確認

# 実行状態の取得
aws swf describe-workflow-execution \
  --domain production-workflow \
  --execution '{"workflowId": "order-12345-1694000000", "runId": "f2e46fb0-..."}' \
  --region ap-northeast-1

# 出力例:
# {
#   "executionInfo": {
#     "execution": { "workflowId": "order-12345-1694000000", "runId": "..." },
#     "executionStatus": "CLOSED",
#     "closeStatus": "COMPLETED",
#     "startTimestamp": 1694000000.0,
#     "closeTimestamp": 1694000120.0,
#     "executionArn": "arn:aws:swf:ap-northeast-1:..."
#   }
# }

4.3 実行イベント履歴の確認

aws swf get-workflow-execution-history \
  --domain production-workflow \
  --execution '{"workflowId": "order-12345-1694000000", "runId": "..."}' \
  --region ap-northeast-1 \
  | jq '.events[] | {eventType: .eventType, timestamp: .eventTimestamp}'

4.4 信号・外部イベント

実行中のワークフローに外部から信号を送信(コンテキスト:注文のキャンセル・支払い確認等)。

# ワークフローに信号を送信
aws swf signal-workflow-execution \
  --domain production-workflow \
  --workflow-id "order-12345-1694000000" \
  --signal-name "OrderCancelled" \
  --input '{"reason": "Customer request", "timestamp": "2024-01-15T10:30:00Z"}' \
  --region ap-northeast-1

5. 料金と運用コスト

5.1 SWF 料金体系

課金項目 価格
ワークフロー実行 $0.000025/実行(月間 100 万実行で $25)
アクティビティタスク $0.000025/タスク
追加メタデータ操作 $0.0000001/操作

5.2 運用コストの実例

シナリオ:注文処理ワークフロー(月間 100万件の注文)

① 毎月の実行
   ワークフロー実行: 100万 × $0.000025 = $25
   Activity Task: 100万 × 3 (validate / payment / shipment)× $0.000025 = $75
   合計月額: $100

② メインフレームと比較
   メインフレーム CICS: 月額 $500,000~
   AWS SWF: 月額 $100
   削減率: 99.98%

③ Step Functions との比較
   Step Functions 実行: 100万 × $0.000025 = $25
   → SWF と同等の料金
   → ただし Step Functions は新規推奨、SWF はレガシー保守のみ

6. 設計パターンとベストプラクティス

6.1 マルチワーカー・スケールアウト設計

# Worker クラスタの実装
class ActivityWorkerCluster:
    def __init__(self, domain, task_list, worker_count=10):
        self.domain = domain
        self.task_list = task_list
        self.worker_count = worker_count
        self.workers = []
    
    def start(self):
        for i in range(self.worker_count):
            worker = ActivityWorker(
                domain=self.domain,
                task_list=self.task_list,
                identity=f'Worker-{i:03d}'
            )
            worker.start()  # スレッド起動
            self.workers.append(worker)
    
    def stop(self):
        for worker in self.workers:
            worker.stop()
    
    def scale(self, new_count):
        """ワーカー数をスケール"""
        if new_count > len(self.workers):
            for i in range(len(self.workers), new_count):
                worker = ActivityWorker(...)
                worker.start()
        else:
            for i in range(new_count, len(self.workers)):
                self.workers[i].stop()

6.2 エラーハンドリング・リトライロジック

def handle_activity_failure(event):
    """Activity Task 失敗時の Decider ロジック"""
    
    activity_attrs = event['activityTaskFailedEventAttributes']
    reason = activity_attrs.get('reason', 'Unknown')
    details = activity_attrs.get('details', '')
    
    # リトライ回数を管理(context に格納)
    retry_count = get_retry_count_from_context()
    max_retries = 3
    
    if retry_count < max_retries:
        # リトライ:指数バックオフ
        schedule_activity_with_backoff(
            activity_type=activity_attrs['activityType'],
            retry_count=retry_count + 1
        )
    else:
        # リトライ回数超過 → ワークフロー失敗
        return {
            'decisionType': 'FailWorkflowExecution',
            'failWorkflowExecutionDecisionAttributes': {
                'reason': f'Activity failed after {max_retries} retries',
                'details': f'{reason}: {details}'
            }
        }

6.3 ワークフロー並列化

def parallel_activity_scheduling():
    """複数 Activity を並列実行"""
    
    decisions = []
    
    # 支払い検証と在庫確認を並列実行
    decisions.append({
        'decisionType': 'ScheduleActivityTask',
        'scheduleActivityTaskDecisionAttributes': {
            'activityType': {'name': 'validate-payment', 'version': '1.0'},
            'activityId': 'payment-task',
            'taskList': {'name': 'ActivityTaskList'},
            'startToCloseTimeout': '300'
        }
    })
    
    decisions.append({
        'decisionType': 'ScheduleActivityTask',
        'scheduleActivityTaskDecisionAttributes': {
            'activityType': {'name': 'check-inventory', 'version': '1.0'},
            'activityId': 'inventory-task',
            'taskList': {'name': 'ActivityTaskList'},
            'startToCloseTimeout': '600'
        }
    })
    
    # 両方の完了を待つ(Decider の次の判定で join)
    return decisions

7. トラブルシューティング

症状 原因 対応
“Task not found” エラー Decider / Worker がタイムアウト中に別の実行が進行 タイムアウト値(task-start-to-close-timeout)を増加
ワークフロー実行が進まない Decider プロセスがクラッシュ・停止中 Decider プロセスログ確認、再起動
Activity Task が実行されない Worker タスクリストが異なる・Worker なし Task List 名の一致確認、Worker 起動確認
Heartbeat タイムアウト Worker が heartbeat を送信しない activity-heartbeat-timeout を増加、またはワーカーコード修正
イベント履歴が見えない ワークフロー実行が古すぎて削除 retention-period 設定確認(デフォルト 90日)

8. SWF から Step Functions への移行ガイド

移行対象の判定基準

判定基準 SWF 継続 Step Functions 移行
プロジェクト年齢 > 5年(既存安定) < 3年(新規・リニューアル)
複雑度 極めて複雑(1000+ 分岐) 中~高程度(100~500 分岐)
ペイロード > 1MB(大規模) < 32KB(JSON)
運用体制 専任チーム(SWF 経験者) 小規模チーム
クラウドネイティブ度 低(オンプレ統合) 高(AWS 完全依存)

段階的移行パターン

  • Phase 1: 新規ワークフロー → Step Functions で開発
  • Phase 2: 既存小規模ワークフロー(< 10万実行/月)→ 順次 Step Functions に置換
  • Phase 3: 既存大規模ワークフロー → SWF で保守継続、新規機能は Step Functions
  • Phase 4: 3~5年後に大規模ワークフロー → Step Functions 移行検討

9. まとめ

Amazon SWF は 「レガシーな分散ワークフロー管理サービス」 である。

使うべき場合

  • メインフレーム・COBOL との統合が必須
  • 既存 SWF ワークフロー(> 100万実行実績)の継続運用
  • 極めて複雑な意思決定ロジック(プログラミング言語での実装が効率的)
  • ペイロード制限なしの大規模データ処理

使うべきでない場合(→ Step Functions を推奨)

  • 新規ワークフロープロジェクト
  • AWS ネイティブサービス主体の組織
  • ビジュアルワークフロー設計・デバッグが必須
  • 小~中規模チーム(学習コスト削減)

参考資料

SWF と Step Functions の比較

Apache Airflow / Temporal との比較

SWF は AWS が提供する重要なレガシーサービスであり、既存システムの保守に不可欠だが、新規開発は Step Functions が標準となっている。