目次

AWS Certified Data Engineer - Associate (DEA-C01)

完全学習ガイド


試験概要

項目 詳細
試験コード DEA-C01
正式名称 AWS Certified Data Engineer - Associate
レベル Associate
難易度 ★★★☆☆
試験時間 170分
問題数 85問(65問採点対象 + 20問採点外)
合格スコア 720/1000
受験料 $150 USD
有効期限 3年
前提推奨 データエンジニアリング実務経験2〜3年 + AWS実務1〜2年

対象者

  • データパイプライン・ETL処理を設計・構築するデータエンジニア
  • データレイク・データウェアハウスを運用するインフラエンジニア
  • データ品質・セキュリティ・ガバナンスを担当するデータアーキテクト

ドメイン別出題割合

ドメイン 出題割合
Domain 1: データ取り込みと変換 34%
Domain 2: データストア管理 26%
Domain 3: データオペレーションとサポート 22%
Domain 4: データセキュリティとガバナンス 18%
┌──────────────────────────────────────────────────────────────────────┐
│  Domain 1: データ取り込みと変換         34%  █████████████████       │
│  Domain 2: データストア管理             26%  █████████████           │
│  Domain 3: データオペレーションと支援    22%  ███████████             │
│  Domain 4: データセキュリティとガバナンス 18%  █████████              │
└──────────────────────────────────────────────────────────────────────┘

最重要は Domain 1(34%)。データ取り込み・変換・パイプラインが中核。


Domain 1: データ取り込みと変換(34%)

1.1 データ取り込みパターン

バッチ取り込み:
RDS / DynamoDB / S3 → AWS Glue ETL → S3 (Parquet/ORC) → Redshift/Athena

ストリーミング取り込み:
IoT/アプリ → Kinesis Data Streams → Lambda / KDA → S3 / Redshift

変更データキャプチャ(CDC):
RDS → AWS DMS → S3 / Redshift(増分レプリケーション)

1.2 AWS Glue

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3からデータ読み込み(Glueカタログ経由)
datasource = glueContext.create_dynamic_frame.from_catalog(
    database='raw_db',
    table_name='sales_data'
)

# 変換処理
# 1. 型変換
transformed = ApplyMapping.apply(frame=datasource, mappings=[
    ('sale_date', 'string', 'sale_date', 'date'),
    ('amount', 'string', 'amount', 'double'),
    ('customer_id', 'string', 'customer_id', 'long')
])

# 2. フィルタリング(amountが0以上のみ)
filtered = Filter.apply(frame=transformed,
                         f=lambda x: x['amount'] > 0)

# 3. S3に書き込み(Parquet形式・パーティション分割)
glueContext.write_dynamic_frame.from_options(
    frame=filtered,
    connection_type='s3',
    connection_options={
        'path': 's3://processed-bucket/sales/',
        'partitionKeys': ['year', 'month']
    },
    format='parquet'
)
job.commit()

Glue の主要コンポーネント

コンポーネント 説明
Glue Data Catalog メタデータリポジトリ(テーブル定義・スキーマ)
Glue Crawler S3等のデータを自動スキャンしてカタログに登録
Glue ETL Job PySpark/Pythonシェルによるデータ変換
Glue DataBrew ノーコードのデータ前処理・可視化ツール
Glue Streaming Apache Spark Structed Streamingによるリアルタイム処理

1.3 Amazon Kinesis

import boto3, json

kinesis = boto3.client('kinesis')

# データ送信(プロデューサー)
kinesis.put_record(
    StreamName='clickstream',
    Data=json.dumps({'user_id': 'u123', 'event': 'click', 'ts': '2025-01-01T12:00:00Z'}),
    PartitionKey='u123'  # 同じパーティションキー → 同じシャードで順序保証
)

# Kinesis Data Firehose: S3への自動配信(バッファリング)
firehose = boto3.client('firehose')
firehose.put_record(
    DeliveryStreamName='clickstream-to-s3',
    Record={'Data': json.dumps({'event': 'purchase', 'amount': 1200})}
)
サービス 特徴 ユースケース
Kinesis Data Streams 低レイテンシ・カスタム処理・複数コンシューマー リアルタイム分析・ML推論
Kinesis Data Firehose フルマネージド・S3/Redshift/OpenSearch自動配信 ログ収集・分析パイプライン
Kinesis Data Analytics Flink/SQLによるストリーム処理 リアルタイム集計・異常検知
MSK (Managed Kafka) Apache Kafka互換・高スループット 大規模イベントストリーミング

1.4 AWS Database Migration Service(DMS)

dms = boto3.client('dms')

# レプリケーションタスク作成(CDC: 変更データキャプチャ)
dms.create_replication_task(
    ReplicationTaskIdentifier='rds-to-s3-cdc',
    SourceEndpointArn='arn:aws:dms:...:endpoint:SOURCE',
    TargetEndpointArn='arn:aws:dms:...:endpoint:TARGET',
    ReplicationInstanceArn='arn:aws:dms:...:rep:INSTANCE',
    MigrationType='cdc',  # full-load | cdc | full-load-and-cdc
    TableMappings=json.dumps({
        "rules": [{"rule-type": "selection", "rule-id": "1",
                   "rule-name": "include-all", "object-locator": {
                       "schema-name": "public", "table-name": "%"},
                   "rule-action": "include"}]
    })
)

1.5 データ変換・品質

# AWS Glue Data Quality
# データ品質ルールをDQDL(Data Quality Definition Language)で定義
dqdl_rules = """
Rules = [
    IsComplete "customer_id",
    IsUnique "order_id",
    ColumnValues "amount" between 0 and 1000000,
    ColumnValues "status" in ["pending", "completed", "cancelled"],
    RowCount > 100
]
"""

Domain 2: データストア管理(26%)

2.1 Amazon S3 データレイク設計

  • データレイク層構成:
  • Raw Layer (S3/raw/) → 生データそのまま保存(変更不可)
  • Processed Layer (S3/processed/) → クレンジング・正規化済み(Parquet)
  • Curated Layer (S3/curated/) → 分析用・集計済み・ドメイン別
# S3 Intelligent-Tiering: アクセスパターンに基づく自動コスト最適化
s3 = boto3.client('s3')

s3.put_bucket_intelligent_tiering_configuration(
    Bucket='my-datalake',
    Id='EntireBucket',
    IntelligentTieringConfiguration={
        'Id': 'EntireBucket',
        'Status': 'Enabled',
        'Tierings': [
            {'Days': 90, 'AccessTier': 'ARCHIVE_ACCESS'},
            {'Days': 180, 'AccessTier': 'DEEP_ARCHIVE_ACCESS'}
        ]
    }
)

2.2 Amazon Redshift

-- Redshift: 列指向DWH(大量データの高速集計)
-- DISTKEY: データを均等分散するキー
-- SORTKEY: よく使う絞り込み条件に設定

CREATE TABLE sales (
    order_id    BIGINT NOT NULL,
    customer_id BIGINT NOT NULL,
    sale_date   DATE NOT NULL,
    amount      DECIMAL(10,2) NOT NULL,
    region      VARCHAR(50)
)
DISTSTYLE KEY
DISTKEY (customer_id)           -- 顧客IDで分散(JOINが多い場合)
COMPOUND SORTKEY (sale_date, region);  -- 日付・地域でよく絞り込む場合

-- Redshift Spectrum: S3上のデータを直接クエリ
CREATE EXTERNAL SCHEMA s3_data
FROM DATA CATALOG
DATABASE 'raw_db'
IAM_ROLE 'arn:aws:iam::123456789:role/redshift-role';

SELECT region, SUM(amount) FROM s3_data.sales
WHERE sale_date >= '2025-01-01' GROUP BY region;
機能 説明
Redshift Spectrum S3上のデータをRedshiftから直接クエリ
Redshift Serverless キャパシティ管理不要・自動スケール
AQUA 分散ハードウェア加速クエリ処理
Data Sharing 異なるRedshiftクラスター間でのデータ共有

2.3 Amazon Athena

-- Athena: S3上のデータをSQLでクエリ(サーバーレス)
-- Parquet + パーティション分割でコスト・速度を最適化

-- パーティション射影(自動パーティション管理)
CREATE TABLE cloudfront_logs (
    date        DATE,
    time        STRING,
    location    STRING,
    bytes       BIGINT,
    status      INT
)
PARTITIONED BY (year STRING, month STRING, day STRING)
STORED AS PARQUET
LOCATION 's3://my-logs/cloudfront/'
TBLPROPERTIES (
    'projection.enabled' = 'true',
    'projection.year.type' = 'integer',
    'projection.year.range' = '2020,2030',
    'projection.month.type' = 'integer',
    'projection.month.range' = '1,12',
    'projection.month.digits' = '2',
    'storage.location.template' = 's3://my-logs/cloudfront/${year}/${month}/${day}/'
);

2.4 データストア選択ガイド

要件 推奨サービス
大規模バッチ分析(PB級) Amazon Redshift
S3上のアドホッククエリ Amazon Athena
NoSQLリアルタイム読み書き Amazon DynamoDB
全文検索・ログ分析 Amazon OpenSearch Service
グラフデータ Amazon Neptune
時系列データ Amazon Timestream
インメモリキャッシュ Amazon ElastiCache

Domain 3: データオペレーションとサポート(22%)

3.1 データパイプラインのオーケストレーション

# Amazon MWAA(Managed Workflows for Apache Airflow)
# Airflow DAGでデータパイプラインを定義
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='daily_etl_pipeline',
    schedule_interval='0 2 * * *',   # 毎日02:00
    start_date=datetime(2025, 1, 1),
    catchup=False
) as dag:

    # Step 1: Glue ETL実行
    run_glue = GlueJobOperator(
        task_id='run_glue_etl',
        job_name='sales-etl-job',
        script_args={'--date': '{{ ds }}'}
    )

    # Step 2: Redshiftにロード
    load_redshift = RedshiftSQLOperator(
        task_id='load_to_redshift',
        sql="COPY sales FROM 's3://processed/{{ ds }}/sales/' IAM_ROLE '...'"
    )

    run_glue >> load_redshift
オーケストレーションツール 特徴
Amazon MWAA マネージドAirflow・複雑な依存関係
AWS Step Functions サーバーレス・AWSサービス連携
AWS Glue Workflows Glueジョブ専用・シンプルなパイプライン
Amazon EventBridge Scheduler スケジュール起動・イベントドリブン

3.2 パフォーマンスチューニング

-- Athena コストと速度の最適化
-- 1. Parquet/ORC形式を使用(CSV比で10〜100倍高速、コスト削減)
-- 2. パーティション分割で不要なデータスキャンを回避
-- 3. 列の絞り込み(SELECT * は避ける)
-- 4. 結果のキャッシュ(同一クエリは再実行なし)

-- Redshift EXPLAIN でクエリプラン確認
EXPLAIN SELECT customer_id, SUM(amount)
FROM sales
WHERE sale_date >= '2025-01-01'
GROUP BY customer_id;
-- → DS_DIST_NONE(分散なし)が理想、DS_BCAST_INNER(ブロードキャスト)は要注意

3.3 障害対応とデバッグ

# Glue Job のエラーログ確認
cloudwatch_logs = boto3.client('logs')

response = cloudwatch_logs.filter_log_events(
    logGroupName='/aws-glue/jobs/error',
    logStreamNamePrefix='my-glue-job',
    filterPattern='ERROR',
    startTime=int((datetime.now() - timedelta(hours=1)).timestamp() * 1000)
)

Domain 4: データセキュリティとガバナンス(18%)

4.1 AWS Lake Formation

lakeformation = boto3.client('lakeformation')

# テーブルレベルのアクセス制御
lakeformation.grant_permissions(
    Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789:role/data-analyst'},
    Resource={
        'TableWithColumns': {
            'DatabaseName': 'sales_db',
            'Name': 'orders',
            'ColumnWildcard': {
                'ExcludedColumnNames': ['credit_card_number', 'ssn']  # PII列を除外
            }
        }
    },
    Permissions=['SELECT'],
    PermissionConditions={
        'rowFilter': {
            'filterExpression': "region = 'JP'"  # 行レベルフィルタ(日本データのみ)
        }
    }
)
Lake Formation機能 説明
データカタログ管理 Glue Data Catalogとの統合
テーブルレベルアクセス制御 IAMより細粒度の権限管理
列レベルセキュリティ 特定列へのアクセス制限(PII保護)
行レベルセキュリティ フィルタ条件でアクセス可能な行を制限
データリネージ データの出所・変換履歴の追跡

4.2 データ暗号化

# S3のサーバーサイド暗号化(SSE)
s3.put_object(
    Bucket='my-datalake',
    Key='sensitive/data.parquet',
    Body=data,
    ServerSideEncryption='aws:kms',           # SSE-KMS
    SSEKMSKeyId='arn:aws:kms:...:key/...'    # カスタムKMSキー
)

# Redshiftの暗号化(クラスター作成時に設定)
# 既存クラスターの暗号化: スナップショット→暗号化クラスターにリストア
暗号化タイプ 鍵管理 用途
SSE-S3 (SSE-S3) S3が管理 デフォルト・手軽
SSE-KMS KMSが管理(監査ログあり) コンプライアンス要件あり
SSE-C ユーザーが管理 独自鍵管理が必要な場合
クライアントサイド暗号化 ユーザーが管理 AWS外でも保護が必要

4.3 データカタログとリネージ

# Amazon DataZone: データメッシュ・データ製品管理
# AWS Glue Data Catalog: テーブル定義・スキーマ管理

glue = boto3.client('glue')

# データリネージの記録(タグ付けで追跡)
glue.tag_resource(
    ResourceArn='arn:aws:glue:ap-northeast-1:123:table/sales_db/orders',
    TagsToAdd={
        'data-owner': 'sales-team',
        'pii-level': 'high',
        'source-system': 'crm-database',
        'last-updated': '2025-01-01'
    }
)

試験頻出サービス早見表

サービス カテゴリ 一言説明
AWS Glue ETL フルマネージドETL・データカタログ
Amazon Redshift DWH 列指向・大規模バッチ分析
Amazon Athena クエリ S3上データのサーバーレスSQL
Kinesis Data Streams ストリーミング リアルタイムデータ取り込み
Kinesis Data Firehose ストリーミング S3/Redshiftへの自動デリバリー
Amazon MSK ストリーミング マネージドApache Kafka
AWS DMS 移行 データベース移行・CDC
Amazon EMR 大規模処理 マネージドHadoop/Spark
AWS Lake Formation ガバナンス データレイクの権限・セキュリティ管理
Amazon MWAA オーケストレーション マネージドAirflow
AWS Step Functions オーケストレーション サーバーレスワークフロー
Amazon DataZone データメッシュ データ製品・カタログ管理

試験重要数値

項目
合格スコア 720/1000
試験時間 170分
Kinesis Data Streams デフォルト保持期間 24時間
Kinesis Data Streams 最大保持期間 365日
Kinesis シャード 最大書き込み 1MB/秒 または 1000レコード/秒
Kinesis シャード 最大読み取り 2MB/秒
Firehose バッファリング(サイズ) 1〜128 MB
Firehose バッファリング(時間) 60〜900秒
Redshift 最大ノード数(ra3.16xlarge) 128ノード
S3 マルチパートアップロード推奨サイズ 100MB以上

8週間学習プラン

Week 1-2: データ取り込みと変換

  • [ ] AWS Glue ETL ハンズオン(PySpark変換・DynamicFrame)
  • [ ] Glue Crawler + Data Catalog 設定
  • [ ] Kinesis Data Streams + Firehose 実践
  • [ ] AWS DMS CDC レプリケーション設定

Week 3-4: データストア管理

  • [ ] S3 データレイク設計(Raw/Processed/Curated)
  • [ ] Amazon Athena パーティション・Parquet最適化
  • [ ] Amazon Redshift DISTKEY/SORTKEY 設計
  • [ ] Redshift Spectrum でS3データクエリ

Week 5-6: データオペレーション

  • [ ] MWAA(Airflow)DAG作成・スケジュール設定
  • [ ] Step Functions データパイプライン
  • [ ] Glue Data Quality ルール設定
  • [ ] CloudWatch でパイプライン監視・アラーム

Week 7-8: セキュリティ・ガバナンス + 試験対策

  • [ ] Lake Formation 列/行レベルセキュリティ設定
  • [ ] KMS + S3/Redshift 暗号化実践
  • [ ] AWS公式模擬試験
  • [ ] 苦手ドメイン集中復習

DEA-C01 追加詳解セクション

データエンジニアリング基礎

データパイプラインの全体像

データエンジニアリングパイプライン:

ソースシステム      → データ取り込み   → ストレージ     → 処理・変換     → 分析・可視化
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RDB, SaaS,         Kinesis,          S3 (Data Lake), Glue ETL,        Athena,
IoT, Logs,         DMS, Glue,        Redshift,       EMR Spark,       QuickSight,
API, Streams       AppFlow,          DynamoDB        Lambda           SageMaker
                   Kafka (MSK)

バッチ処理 vs ストリーム処理

比較項目 バッチ処理 ストリーム処理
データ処理タイミング 定期的(時間/日次) リアルタイム
レイテンシ 高い(分〜時間) 低い(ミリ秒〜秒)
スループット 高い 中程度
複雑さ 低い 高い
コスト 低い 高め
AWSサービス Glue ETL, EMR, Lambda Kinesis, MSK, Flink
ユースケース 日次レポート、ML訓練 不正検知、IoT監視、リアルタイム推薦

Amazon Kinesis 詳細(試験最頻出

Kinesis Data Streams(KDS):
  → リアルタイムストリームデータの収集・保存
  → シャード(Shard)単位でスケール
    1シャード = 1MB/秒書き込み、2MB/秒読み取り
  → データ保持期間: デフォルト24時間(最大365日)
  → コンシューマー: Lambda, KDA, KDF, カスタムアプリ

Kinesis Data Firehose(KDF):
  → ストリームデータをS3/Redshift/OpenSearch等に自動配信
  → サーバーレス・フルマネージド(シャード管理不要)
  → バッファリング(最低60秒 or 1MB)してからバッチ書き込み
  → Lambda変換が組み込み可能
  → 近リアルタイム(ほぼリアルタイムだが数十秒の遅延あり)

Kinesis Data Analytics:
  → Flink(Java/Python/SQL)でストリームデータを分析
  → リアルタイム集計・異常検知・ウィンドウ集計

MSK(Managed Streaming for Apache Kafka):
  → Apache Kafkaのフルマネージド版
  → KDSより高い互換性(既存Kafkaアプリの移行)
  → より柔軟なコンシューマーグループ管理

試験での問い方: 「管理不要でストリームデータをS3に継続的に書き込みたい」→ Kinesis Data Firehose


AWS Glue 詳細

Glue ETL:
  → サーバーレスのETL(抽出・変換・ロード)サービス
  → Python/Scala (PySpark) のETLスクリプトを実行
  → ビジュアルETLエディタ(コード不要のUI操作)
  → Glue DynamicFrame: DataFrameの拡張(半構造化データ対応)

Glue Data Catalog:
  → S3/RDS/Redshift等のメタデータリポジトリ
  → テーブルスキーマ・パーティション情報を一元管理
  → Athena・EMR・Redshift Spectrum から参照

Glue Crawler:
  → S3等のデータソースをスキャンしてスキーマを自動検出
  → Data Catalogにテーブル定義を自動作成
  → パーティションの自動検出

Glue DataBrew:
  → ノーコードのデータクレンジング・変換UI
  → 250以上の組み込み変換(欠損値補完・外れ値除去等)
  → ML エンジニアやデータアナリスト向け(コード不要)

Amazon Redshift 詳細

Redshiftの特徴:
  → ペタバイト級のデータウェアハウス
  → 列指向(Columnar)ストレージ → 分析クエリに最適
  → MPP(大規模並列処理)アーキテクチャ
  → PostgreSQL互換(JDBC/ODBC)

Redshift Spectrum:
  → Redshiftクラスターから直接S3のデータをクエリ
  → データをRedshiftにロードせずに分析可能
  → Glue Data Catalogのテーブル定義を使用

Redshift Serverless:
  → キャパシティの事前設定不要
  → 使用量ベースの課金
  → 開発・テスト・散発的なクエリに最適

Amazon Athena 詳細

特徴:
  → S3に保存されたデータに対してSQLクエリを実行
  → サーバーレス(インフラ管理不要)
  → スキャンしたデータ量で課金($5/TB)
  → Glue Data Catalogのスキーマを使用
  → ORC/Parquet形式でコスト削減(圧縮・列指向)

コスト最適化:
  → Parquet/ORC列指向フォーマットを使用(スキャン量削減)
  → データをS3でパーティショニング(WHERE条件でパーティションプルーニング)
  → 大きなファイルに圧縮(多数の小さなファイルを避ける)

Athena Federated Query:
  → RDS/DynamoDB/他データソースにもSQLクエリを実行
  → Lambda Data Source Connectorを使用

Lake Formation

データレイクのセキュリティ・ガバナンスを一元管理

主要機能:
  1. データカタログの管理(Glue Data Catalogと統合)
  2. 行レベル・列レベルのセキュリティ
  3. データアクセス権限の一元管理(IAMより細かい粒度)
  4. Blueprint(ブループリント): よくあるデータ取り込みパターン

Lake Formation と S3 バケットポリシーの違い:
  → Lake Formation: テーブル・列・行レベルの細かいアクセス制御
  → S3 バケットポリシー: S3オブジェクトレベルのアクセス制御
  → Lake FormationはGlue・Athena・Redshift Spectrum経由のアクセスを制御

AWS DMS(Database Migration Service)

主な用途:
  → オンプレミスDBからAWSへの移行
  → 異種DB間の移行(Oracle → Aurora等)
  → 継続的なデータレプリケーション

Full Load(全データ移行):
  → テーブル全体を一度に移行
  → 移行中もソースDBは稼働(最小ダウンタイム)

CDC(Change Data Capture):
  → 増分変更のみをリアルタイムにレプリケート
  → 本番移行完了後の継続レプリケーションに使用

Schema Conversion Tool(SCT):
  → 異種DBのDDL(スキーマ)を変換
  → Oracle SQLをAurora PostgreSQL用に自動変換

DEA-C01 模擬問題(本番形式 65問相当)


問題 DEA-01

リアルタイムのIoTセンサーデータを収集し、S3に継続的に書き込みたい。管理コストを最小化したい。最も適切なサービスはどれですか?

  • A. Amazon Kinesis Data Streams
  • B. Amazon Kinesis Data Firehose
  • C. Amazon MSK(Managed Kafka)
  • D. Amazon SQS
正解と解説

正解: B

Kinesis Data Firehoseはサーバーレス・フルマネージドで、ストリームデータをS3に自動的にバッファリングして書き込む。シャード管理が不要で運用コストが低い。KDSは管理が必要、MSKはKafka専門知識が必要。


問題 DEA-02

S3に保存されたParquetファイルに対してSQLクエリを実行したい。コストを最小化したい。最も適切なサービスはどれですか?

  • A. Amazon Redshift(データをロード後)
  • B. Amazon Athena
  • C. Amazon EMR(Spark SQL)
  • D. AWS Glue ETL
正解と解説

正解: B

Amazon AthenaはS3に対してサーバーレスでSQLクエリを実行でき、スキャンデータ量での従量課金。Parquetは列指向圧縮形式でAthenaのコストを大幅削減できる。Redshiftはデータロードが必要でコストが高い。


問題 DEA-03

AWS GlueのCrawlerの主な役割はどれですか?

  • A. データをS3からRedshiftにロードする
  • B. S3等のデータソースをスキャンしてスキーマを自動検出しData Catalogに登録する
  • C. ETLジョブを定期的にスケジュール実行する
  • D. データの暗号化と圧縮を自動化する
正解と解説

正解: B

Glue Crawlerはデータソースを自動スキャンし、テーブルスキーマ・パーティション情報をGlue Data Catalogに自動登録する。これにより手動でスキーマ定義なしにAthenaやRedshift Spectrumから参照できる。


問題 DEA-04

「ストリーミングデータをリアルタイムに集計・変換してダッシュボードに表示したい。管理不要が望ましい」最も適切なサービスはどれですか?

  • A. Amazon EMR(Hadoop)
  • B. Amazon Kinesis Data Analytics(Flink)
  • C. AWS Glue ETL
  • D. AWS Batch
正解と解説

正解: B

Kinesis Data Analyticsはストリームデータに対してFlinkを使ったリアルタイム分析(ウィンドウ集計・異常検知等)をサーバーレスで実行できる。EMRはバッチ処理向き。Glue ETLはバッチETL。


問題 DEA-05

Lake Formationを使ったデータレイクで実現できる、IAMでは実現が困難なアクセス制御はどれですか?

  • A. S3バケットへのクロスアカウントアクセス制御
  • B. Athenaクエリ結果のGlueテーブル内の特定列へのアクセスを禁止する列レベルセキュリティ
  • C. VPCエンドポイント経由でのS3アクセス制限
  • D. KMSキーによるデータ暗号化
正解と解説

正解: B

Lake Formationは列レベル・行レベルのセキュリティを提供する。IAMポリシーはS3オブジェクト全体への制御のみで、GlueテーブルのCOLUMN(列)レベルのアクセス制御はIAMだけでは実現できない。


問題 DEA-06

AWS DMSで「本番Oracleデータベースを無停止でAmazon Auroraに移行したい」場合のアプローチはどれですか?

  • A. Full Load のみ実行してデータを一括移行する
  • B. Full Load + CDC(変更データキャプチャ)で継続的に同期する
  • C. Schema Conversion Tool のみを使用する
  • D. 手動でSQLダンプを取得してAuroraに適用する
正解と解説

正解: B

Full LoadでまずOracleの全データをAuroraにコピーし、その後CDCで増分変更をリアルタイムにレプリケートする。完全同期が確認できたらアプリケーションを新DBに切り替える(最小ダウンタイム移行)。


問題 DEA-07

Athenaのクエリコストを削減する方法として最も効果的なものはどれですか?(2つ選択)

  • A. データをParquetやORCの列指向フォーマットに変換する
  • B. S3バケットを別リージョンに移動する
  • C. S3のデータをパーティション分割してWHEREでフィルタリングする
  • D. クエリ結果をDynamoDBにキャッシュする
  • E. Athenaの使用量を CloudWatch で監視する
正解と解説

正解: A, C

Athenaはスキャンデータ量で課金。A: Parquet/ORC列指向フォーマットは必要な列のみスキャンし、圧縮率も高い(コスト87%削減の実績も)。C: パーティション(年/月/日等)で絞り込むと不要なパーティションをスキャンしない。


問題 DEA-08

Redshift Spectrumの説明として正しいものはどれですか?

  • A. Redshiftクラスターの外部でSQLを実行するサーバーレスサービス
  • B. Redshiftクラスターから直接S3のデータにSQLクエリを実行する機能
  • C. S3からRedshiftへのデータを自動ロードするサービス
  • D. RedshiftのデータをS3にエクスポートする機能
正解と解説

正解: B

Redshift Spectrumは、Redshiftクラスターから直接S3(外部テーブル)に対してSQLクエリを実行できる機能。S3のデータをRedshiftにロードせずに分析でき、ホットデータ(Redshift内)とコールドデータ(S3)を統合クエリできる。


問題 DEA-09

バッチ処理とストリーム処理の選択基準として正しいものはどれですか?

  • A. 「不正取引をリアルタイムに検知したい」→ バッチ処理
  • B. 「毎日夜中に前日の売上集計レポートを生成したい」→ ストリーム処理
  • C. 「ユーザー行動ログをリアルタイムに分析してレコメンドを更新したい」→ ストリーム処理
  • D. 「IoTセンサーの全履歴データを月1回分析したい」→ ストリーム処理
正解と解説

正解: C

リアルタイム処理が必要(不正検知・IoT監視・ライブダッシュボード)→ ストリーム処理。定期的な集計・ML訓練・夜間バッチ → バッチ処理。


問題 DEA-10

AWS Glue DataBrewの主な対象ユーザーはどれですか?

  • A. Sparkコードを書くデータエンジニア
  • B. コードを書かずにGUIでデータクレンジング・変換を行うデータアナリスト
  • C. MLモデルを訓練するデータサイエンティスト
  • D. インフラを管理するシステム管理者
正解と解説

正解: B

Glue DataBrewはノーコードのビジュアルデータ変換ツール。250以上の組み込み変換をクリック操作で適用できる。コーディング不要でデータクレンジングを行うデータアナリスト向け。Glue ETLはPySparkコードが必要。


DEA-C01 試験直前チェックリスト

  • [ ] Kinesis Data Streams vs Firehose vs Analytics の違い
  • [ ] Glue Crawler・ETL・DataBrew・Data Catalog の役割
  • [ ] Athena のコスト最適化(Parquet/パーティション)
  • [ ] Redshift vs Athena の使い分け
  • [ ] Lake Formation の列・行レベルセキュリティ
  • [ ] DMS Full Load + CDC の移行パターン
  • [ ] バッチ vs ストリーム処理の使い分け

付録: DEA-C01 頻出サービス一覧

サービス DEA試験での重点
Amazon Kinesis Data Streams シャード・保持期間・コンシューマー
Amazon Kinesis Data Firehose S3/Redshiftへの自動配信・変換
Amazon Kinesis Data Analytics リアルタイムFlinkアプリ
Amazon MSK(Managed Kafka) Kafkaの移行・互換性
AWS Glue ETL・Crawler・Data Catalog・DataBrew
Amazon Athena S3へのサーバーレスSQL・コスト最適化
Amazon Redshift データウェアハウス・Spectrum
Amazon S3 データレイク・ストレージクラス
AWS Lake Formation データレイクセキュリティ・ガバナンス
AWS DMS DB移行・CDC
Amazon EMR Hadoop/Spark マネージドクラスター
AWS Glue DataBrew ノーコードデータ変換
Amazon QuickSight BIダッシュボード・可視化
Amazon OpenSearch Service ログ分析・全文検索
Amazon DynamoDB NoSQL・Streams

データパイプライン設計パターン

AWS Data Pipeline アーキテクチャ

Lambda アーキテクチャ(バッチ + リアルタイム):

バッチレイヤー:
  S3(生データ) → EMR/Glue(バッチ処理) → S3/Redshift(集計結果)
  
スピードレイヤー:
  Kinesis Streams → Lambda/KDA(リアルタイム処理) → DynamoDB/ElastiCache
  
サービングレイヤー:
  S3/Redshift → Athena/QuickSight(分析)
  DynamoDB/ElastiCache → API(リアルタイム配信)

カッパアーキテクチャ(ストリーミングのみ):
  全データをストリーミングとして処理
  バッチ処理をストリーミングで置き換え
  Kinesis/MSK → Flink/KDA → 集計/保存

ELT vs ETL

ETL(Extract-Transform-Load):
  変換を外部ツールで実施してからロード
  Glue ETL: S3 から変換して S3/Redshift にロード
  
  利点: シンプル、変換の制御が容易
  欠点: ソースデータの保持がない(元データと変換済みが別)

ELT(Extract-Load-Transform):
  まずロードしてからデータウェアハウス内で変換
  S3(Raw) → Redshift にロード → Redshift 内で SQL 変換
  
  利点: 生データを保持(再処理が容易)
  欠点: データウェアハウスのリソースを変換に使用
  
現代のデータレイク:
  S3(Raw Zone) → Glue(変換) → S3(Processed Zone) → Redshift/Athena
  ELT に近いアプローチが主流

Kinesis 詳細

Kinesis Data Streams シャード設計

シャード容量計算:
  書き込み: 1 MB/秒 または 1,000 レコード/秒(低い方が上限)
  読み取り: 2 MB/秒(標準コンシューマー:複数コンシューマーで共有)
          2 MB/秒(拡張ファンアウト:コンシューマーごとに独立)

必要シャード数の計算:
  書き込み量(MB/秒) / 1 = 必要シャード数(書き込み)
  読み取り量(MB/秒) / 2 = 必要シャード数(読み取り)
  大きい方を選択
  
例:
  入力: 5 MB/秒, 5,000 レコード/秒
  コンシューマー: 3つ, 各 3 MB/秒
  
  書き込みシャード: max(5/1, 5000/1000) = 5
  読み取りシャード: 3コンシューマー × 3MB / 2MB = 4.5 → 5
  → 5シャード以上が必要

シャードの増減:
  UpdateShardCount で変更(1日2回まで)
  2倍または0.5倍ずつ変更(一度に)
サービス名の変遷:
  Kinesis Data Analytics → Managed Flink → Amazon Managed Service for Apache Flink
  
ウィンドウ処理の種類:

タンブリングウィンドウ(Tumbling Window):
  重複しない固定サイズの時間窓
  例: 5分ごとに集計(0-5分, 5-10分, ...)
  
スライディングウィンドウ(Sliding Window):
  重複する移動する時間窓
  例: 過去5分間の平均を1分ごとに計算
  
セッションウィンドウ(Session Window):
  活動の「セッション」でグループ化
  一定時間(ギャップ)活動がなければセッション終了
  Eコマースのユーザーセッション分析

使用例:
  IoTセンサーの異常検知:
    SELECT STREAM 
      device_id, 
      AVG(temperature) OVER (
        PARTITION BY device_id 
        RANGE INTERVAL '1' MINUTE PRECEDING
      ) as avg_temp
    FROM sensor_stream

AWS Glue 詳細

Glue ETL ジョブの最適化

Glue DPU(Data Processing Unit):
  1 DPU = 4 vCPU + 16 GB RAM
  Spark ジョブ: 最小2 DPU(デフォルト10 DPU)
  
Auto Scaling:
  G.025X(1/4 DPU): 最小ワーカー
  G.1X(1 DPU): 標準
  G.2X(2 DPU): メモリ集約型処理
  
コスト最適化:
  ジョブブックマーク: 増分処理(全データを毎回処理しない)
  Glue Flex: 非緊急バッチジョブに最大34%安価
  Streaming Glue: ミニバッチでストリーミング処理

Glue Studio:
  ビジュアルなETLエディタ
  ドラッグ&ドロップでデータ変換フローを構築
  自動コード生成(Python/Scala)

Glue カタログの管理

カタログの構造:
  データベース > テーブル > パーティション
  
データベース命名規則:
  prod_orders, dev_customers 等(環境プレフィックスを推奨)

テーブルのパーティション:
  S3 パスを使った Hive スタイルのパーティション
  s3://bucket/table/year=2024/month=01/day=15/
  
  パーティションプルーニング:
  WHERE year='2024' AND month='01' → そのパーティションのみスキャン
  → Athena コスト削減
  
Lake Formation との統合:
  カラムレベルのアクセス制御
  行レベルセキュリティ(Row-Level Security)
  データマスキング

Amazon Redshift 詳細

Redshift 最適化

テーブル設計の最適化:

分散スタイル(Distribution Style):
  EVEN: 全スライスに均等分散(デフォルト、結合頻度が低いテーブル)
  KEY: 指定した列の値でスライスを決定(結合で使用する列に指定)
  ALL: 全スライスにコピー(小さいディメンションテーブル)
  AUTO: Redshift が自動選択

ソートキー(Sort Key):
  Compound Sort Key: 指定した列の順序でソート
  Interleaved Sort Key: 各列を均等に重み付け(複数列でフィルタリング)
  
VACUUM:
  削除マークされたレコードを物理削除
  ソートキー順序を再整理
  Auto VACUUM: Redshift が自動実行(大規模テーブルは手動)
  
ANALYZE:
  統計情報を更新(クエリオプティマイザーの精度向上)
  Auto ANALYZE: 自動実行

Redshift クラスター管理

クラスタータイプ:
  RA3 ノード(推奨):
    コンピュートとストレージを分離
    マネージドストレージ(S3 ベース)
    最大 128 ノード
    
  DC2 ノード(レガシー):
    SSD ストレージ(固定容量)
    最大 32 ノード

Redshift Serverless:
  コンピューティングリソースを自動管理
  RPU(Redshift Processing Unit)単位で課金
  最大 RPU 数を設定して上限をコントロール
  0 スケールに対応(使用しない時は0に)
  
スナップショット:
  自動スナップショット(1〜35日保持)
  手動スナップショット(明示的に削除まで保持)
  クロスリージョン/クロスアカウントコピー

Amazon Athena 詳細

Athena コスト最適化

スキャン量でコスト最適化:
  Athena: $5/スキャンした 1TB

最適化方法:

1. Parquet/ORC 形式(列指向):
   列指向フォーマット → 必要な列のみ読み込み
   圧縮あり → データサイズ削減
   CSV に比べて 60-90% のコスト削減

2. パーティショニング:
   WHERE 条件にパーティション列を使用
   → スキャン対象データを絞り込み
   例: WHERE year=2024 AND month=01

3. 列の絞り込み:
   SELECT * より SELECT 必要な列 のみ
   → スキャン量削減

4. データサイズ最適化:
   ファイルを大きくまとめる(多数の小ファイルより少数の大ファイル)
   128MB 以上が推奨(理想: 256MB〜1GB)
   
Athena の制限事項:
  クエリタイムアウト: 最大 30 分
  同時クエリ数: アカウントごとに制限あり(Service Quotas で確認)
  DDL タイムアウト: 600 秒

Athena Federated Query

連合クエリの仕組み:
  Athena Data Source Connector(Lambda 関数)
  → 外部データソースへのクエリを変換
  → 結果を Athena に返す

対応データソース(コネクタ):
  CloudWatch Logs
  DynamoDB
  DocumentDB
  RDS (MySQL/PostgreSQL)
  Redshift
  OpenSearch
  HBase(EMR)
  
クロスソースクエリ:
  S3 の Parquet データ + DynamoDB を JOIN
  
  SELECT s.product_name, d.inventory_count
  FROM s3data.products s
  JOIN dynamodb.inventory d ON s.product_id = d.product_id
  WHERE s.category = 'Electronics'

AWS Lake Formation

データカタログとアクセス制御

Lake Formation のアーキテクチャ:
  S3(データ)+ Glue カタログ(メタデータ)+ Lake Formation(権限管理)

列レベルセキュリティ:
  テーブルの特定の列へのアクセスを制限
  例: 社員データの「給与」列は HR のみアクセス可

行フィルター:
  WHERE 条件に相当するアクセス制御
  例: region='JP' のデータのみ日本チームに公開

セルレベルセキュリティ:
  行フィルター + 列フィルターの組み合わせ

データ共有(Cross-Account):
  Lake Formation からリソースリンクを作成
  他アカウントが自分のカタログのようにデータにアクセス
  AWS RAM(Resource Access Manager)と統合

データ品質とガバナンス

AWS Glue Data Quality

品質ルールの定義(DQDL: Data Quality Definition Language):
  ColumnValues "age" between 0 and 120
  ColumnValues "email" matches "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}{{CONTENT}}quot;
  ColumnCount = 10
  RowCount > 1000
  IsUnique "user_id"
  
評価タイプ:
  Threshold(しきい値): 数値が範囲内か
  Regex: パターンマッチング
  Set: 特定の値のセットに含まれるか
  
Glue パイプラインとの統合:
  ETL ジョブの前後にデータ品質チェックを実施
  品質スコアが閾値を下回る場合にジョブを失敗させる
  品質結果を CloudWatch メトリクスに送信

AWS DataZone

DataZone のコンセプト:
  データの生産者(Producer)と消費者(Consumer)を橋渡し
  データカタログ + アクセス管理 + コラボレーション
  
主な機能:
  データポータル: データセットの検索・発見
  承認フロー: データアクセスの申請と承認
  ビジネスデータカタログ: ビジネス用語でデータを記述
  データ系統(Data Lineage): データの出所追跡
  
統合:
  Glue カタログ、Amazon Redshift、Amazon S3
  Athena、SageMaker Studio

模擬試験追加問題(35問)

問題 8

大量のリアルタイムデータを処理してリアルタイムダッシュボードを提供するシステムを設計しています。データは IoT デバイスから毎秒 50 MB 以上送られ、1 秒以内に集計して表示する必要があります。最適なアーキテクチャはどれですか?

  • A. SQS → Lambda → DynamoDB → API Gateway
  • B. Kinesis Data Streams → Amazon Managed Flink → DynamoDB → API Gateway
  • C. S3 → Athena → QuickSight
  • D. SNS → SQS → EC2 → RDS
正解と解説

正解: B

要件の分析:

  • リアルタイム(1秒以内): バッチ処理(C)は不可
  • 大量データ(50 MB/秒): SQS は最大 256 KB/メッセージ、スループット制限あり
  • ストリーミング集計: SQL ベースのウィンドウ処理が必要

Kinesis Data Streams: 大量のリアルタイムデータを収集(シャードで並列処理) Amazon Managed Flink: ウィンドウ集計(過去1秒間の平均等)をリアルタイム実行 DynamoDB: 集計結果をリアルタイムで更新(低レイテンシ読み取り) API Gateway + WebSocket: リアルタイムダッシュボードへの配信


問題 9

Athena でクエリを実行するたびに $50 以上かかっています。同じクエリを毎日実行する場合、コストを削減する最良の方法はどれですか?

  • A. Athena の同時クエリ数を減らす
  • B. S3 データを Parquet 形式に変換し、年/月/日でパーティション分割する
  • C. より大きな EC2 インスタンスを使用する
  • D. データを別のリージョンに移動する
正解と解説

正解: B

Athena コスト最適化:

  1. Parquet 形式: CSV 比で 60-90% のスキャン量削減(列指向フォーマット + 圧縮)
  2. パーティション分割: WHERE year=2024 AND month=01 AND day=15 のようなクエリでスキャン量を大幅削減

$5/TB のスキャン料金が主なコストなので、スキャン量の削減が最も効果的です。

変換手順: Glue ETL ジョブで CSV → Parquet に変換し、パーティションを追加。


問題 10

DMS(Database Migration Service)を使ってオンプレミスの MySQL から Amazon Aurora に移行しています。ダウンタイムを最小化しながら移行する手順として正しいのはどれですか?

  • A. Full Load のみ実施して移行
  • B. Full Load + CDC(Change Data Capture)を使用して移行後に切り替え
  • C. AWS Backup でバックアップを取得してリストア
  • D. mysqldump でエクスポートして S3 経由でインポート
正解と解説

正解: B

DMS の移行フロー(最小ダウンタイム):

1. Full Load: 既存データをすべて Aurora に移行
2. CDC(継続的レプリケーション): 移行中の変更を継続的に反映
3. ラグが十分小さくなったら(数秒以内):
4. アプリケーションを切り替え(メンテナンスウィンドウ)
5. DNS/接続文字列を Aurora に変更

CDC を使うことで、Full Load 中の変更も失われずに移行できます。ダウンタイムはカットオーバー時の数分のみです。


問題 11

Amazon Redshift Spectrum を使用する場合の考慮点はどれですか?

  • A. データを Redshift クラスターにロードする必要がある
  • B. S3 のデータをロードせずに Redshift SQL で直接クエリできるが、S3 データは Parquet/ORC 等の最適化フォーマットにすることでパフォーマンスが改善する
  • C. Redshift Spectrum はリアルタイムデータ処理に最適化されている
  • D. Redshift Serverless との併用はできない
正解と解説

正解: B

Redshift Spectrum の特徴:

  • S3 の外部テーブルを Redshift クエリに含めることができる
  • データのロード不要でコスト削減
  • パフォーマンス最適化: Parquet/ORC フォーマット + パーティション
  • スキャン料金: $5/1TB(Athena と同じ)
-- 外部テーブルを定義(Glue カタログを使用)
CREATE EXTERNAL SCHEMA s3_data
FROM DATA CATALOG
DATABASE 'glue_db'
IAM_ROLE 'arn:aws:iam::xxx:role/SpectrumRole';

-- S3 データと Redshift データを JOIN
SELECT r.customer_id, s.purchase_history
FROM redshift_customers r
JOIN s3_data.purchase_logs s ON r.id = s.customer_id;

問題 12〜42(ショート形式)

問題 12: AWS Glue の「ジョブブックマーク(Job Bookmark)」の仕組みは? → 正解: 前回処理したデータの位置(例: S3 ファイルの最終更新時刻)を記録し、次回実行時は新しいデータのみ処理。S3 の新規ファイルや DynamoDB の新規アイテムの増分処理に使用。全データの再処理を防いでコスト削減

問題 13: Amazon Kinesis Data Firehose と Kinesis Data Streams の使い分けは? → 正解: KDS: データを保持(最長7日)して複数コンシューマーがリプレイ可能、低レイテンシ(ミリ秒)。KDF(Firehose): データをS3/Redshift/OpenSearch/Splunkに配信(バッファリングあり、コンシューマーを自分で管理不要)。KDS→KDFの組み合わせも一般的

問題 14: Amazon MSK(Managed Streaming for Kafka)の特徴は? → 正解: Apache Kafka のフルマネージドサービス。Kafka の標準 API との互換性(既存の Kafka アプリを移行可能)。ZooKeeperの管理が不要(MSK が管理)。MSK Serverless: サーバーレスで Kafka クラスターを自動管理

問題 15: Amazon OpenSearch Service の用途は? → 正解: (1) 全文検索(ログ/ドキュメント/Eコマース商品検索)、(2) ログ分析・可視化(Kibana/OpenSearch Dashboards)、(3) 時系列分析(IoT/アプリログ)。Kinesis Firehose や CloudWatch Logs との統合が容易

問題 16: S3 のデータを Glue でパーティション追加する際の「パーティションプロジェクション」の利点は? → 正解: カタログの MSCK REPAIR TABLE を実行せずに新しいパーティションを自動認識。パーティション情報をカタログに保存せずに設定から動的生成。大量のパーティション(数百万)でもメタデータ操作が不要で高速

問題 17: AWS Glue の「動的フレーム(DynamicFrame)」と Spark の「データフレーム(DataFrame)」の違いは? → 正解: DynamicFrame: Glue 固有のデータ構造、スキーマが不一致なデータ(型不一致等)を柔軟に処理、Glue ネイティブの変換関数。DataFrame: Spark 標準、一般的なエコシステムとの互換性、相互変換(toDF/fromDF)が可能

問題 18: Amazon Redshift のコンサームドスキャンを削減する「マテリアライズドビュー」の特徴は? → 正解: クエリ結果を事前に計算して保存するビュー。毎回の集計クエリを高速化。AUTO REFRESH で自動更新可能。増分更新(変更された行のみ再計算)をサポート。コスト削減とパフォーマンス向上

問題 19: EMR(Elastic MapReduce)のクラスタータイプとユースケースは? → 正解: 一時クラスター(Transient): 特定のジョブ実行後に終了(バッチ処理/コスト最適化)。永続クラスター(Persistent): 常時起動(対話型分析/継続的なストリーミング処理)。EMR Serverless: インフラ管理不要(新しい選択肢)

問題 20: AWS データ移行ツールの比較(DMS/SCT/Snowball/DataSync) → 正解: DMS: オンライン DB 移行(異種DB含む)+ CDC(継続的レプリケーション)。SCT: DB スキーマ変換(Oracle→Aurora等の変換)。Snowball Edge: 物理デバイスによる大規模データ移行(テラバイト〜ペタバイト)。DataSync: オンプレNFS/SMB→S3/EFS/FSxの継続的同期

問題 21: Kinesis Data Streams で ProvisionedThroughputExceededException が発生した場合の対処法は? → 正解: (1) シャード数を増やす(UpdateShardCount)、(2) パーティションキーの分散を改善(ホットパーティション対策)、(3) 指数バックオフでリトライ、(4) プロデューサー側でバッチ送信(PutRecords)

問題 22: AWS Glue DataBrew のレシピとプロジェクトの概念は? → 正解: プロジェクト: データセット + レシピの作業単位(サンドボックス)。レシピ: データ変換手順の記録(ステップのシーケンス)。プロジェクトで作成したレシピを Glue DataBrew ジョブとして本番実行。ノーコードETLの基本単位

問題 23: S3 を使ったデータレイクで「生データ」と「変換済みデータ」を分ける理由は? → 正解: 生データを保持することで(1) 再処理可能(変換ロジック修正後に再実行)、(2) 新しい分析の追加(未加工データから別の変換を適用)、(3) 規制要件(原本の保持)。Zone Architecture: Raw → Refined → Analytics

問題 24: Amazon Quicksight の「SPICE」の役割は? → 正解: Super-fast, Parallel, In-memory Calculation Engine。データをインポートしてメモリ内に保存し、高速なクエリを実現。10 GB/ユーザー(Enterprise は無制限)。直接クエリ(S3/Athena/RDS等)より高速だがリアルタイム性は低下

問題 25: EMR クラスターのコスト最適化に使用できる機能は?(2つ) → 正解: (1) Spot インスタンスのタスクノード(コアノードはオンデマンド推奨)で最大90%削減。(2) EMR Managed Scaling: ワークロードに基づいて自動的にクラスターサイズを調整(アイドル時は縮小)

問題 26: Amazon Kinesis Data Firehose の「バッファリング」設定の意味は? → 正解: バッファサイズ(MB)またはバッファ時間(秒)のどちらかが条件を満たしたらデータをフラッシュ(配信)。サイズ: 64 MB まで(S3の場合)。時間: 60〜900 秒。条件が先に満たされた方が優先されるORロジック

問題 27: AWS Glue の「クローラー(Crawler)」が新しいパーティションを検出する設定は? → 正解: クローラーの「Crawl new sub-folders only」モードを使用。S3 の新しいパーティション(フォルダ)が追加されると自動的にカタログを更新。毎日実行してパーティションを最新に保つのが一般的な設定

問題 28: Amazon Athena Workgroups の用途は? → 正解: クエリの実行環境をチーム/プロジェクト別に分離。スキャン上限(1クエリあたり/ワークグループ全体)のコスト制御。S3 出力場所の分離。タグによるコスト配分。IAM での権限管理

問題 29: Amazon Redshift の COPY コマンドで S3 からデータロードする際の並列処理を最大化する方法は? → 正解: ファイル数をスライス数(ノード数 × 各ノードのスライス数)の倍数に設定。例: 4ノード × 4スライス = 16スライスなら16の倍数のファイル数。大きなファイルをgzip等で圧縮することも効果的

問題 30: AWS Database Migration Service(DMS)の「タスクの移行タイプ」の種類は? → 正解: Full Load: 既存データの完全移行(1回のみ)。CDC Only: 変更データのみ(既にデータが存在する場合)。Full Load + CDC: Full Load 後に継続的にCDCを実行(最小ダウンタイム移行)

問題 31: Amazon Kinesis Data Streams のレコードの「シーケンス番号(Sequence Number)」の特性は? → 正解: KDS が自動的に割り当てる一意の識別子。同じパーティションキーでは単調増加。シーケンス番号で特定の位置から読み取りを再開(AT_SEQUENCE_NUMBER/AFTER_SEQUENCE_NUMBER)。レコードの順序保証にはパーティションキーを統一

問題 32: AWS Glue Studio での「ソースとターゲット」の設定で、デフォルトで除外されるシステムフィールドの処理は? → 正解: Glue DynamicFrame にはシステムフィールドが含まれる場合がある($path 等)。resolveChoicedrop_fields で不要フィールドを削除してからターゲットに書き込む。Parquet 変換時には型不一致に注意

問題 33: Amazon QuickSight の「異常検知(ML Insights)」機能は? → 正解: 機械学習を使ってダッシュボードのメトリクスの異常値を自動検出。例: 売上が通常のパターンから外れた日を自動ハイライト。通知も設定可能(メール/SNS)。追加費用なし(Enterprise エディション)

問題 34: S3 のデータを Athena でクエリする際の「プロジェクション(Column Pruning)」の効果は? → 正解: SELECT * ではなく SELECT 必要な列 のみ指定することでスキャン量を削減。Parquet/ORC の列指向フォーマットと組み合わせることで最大効果。CSV では列指向スキャンの恩恵がなく効果が限定的

問題 35: AWS DMS の「継続的レプリケーション(CDC)」でサポートされるソースの条件は? → 正解: ソースDB でバイナリログ(MySQL/Aurora)、Redo ログ(Oracle)、WAL(PostgreSQL)、Change Data Capture(SQL Server)等のネイティブCDC機能が有効になっている必要がある。対象テーブルにプライマリキーが必要(ないと全行更新)

問題 36: Amazon Kinesis Data Streams の「拡張ファンアウト(Enhanced Fan-Out)」を使用する前提条件は? → 正解: 拡張ファンアウトコンシューマー(RegisterStreamConsumer)として登録が必要。ストリームあたり最大20の拡張ファンアウトコンシューマー(デフォルト5)。追加料金: 0.015/コンシューマーシャード時間 + 0.013/GB

問題 37: Amazon EMR のコアノードとタスクノードの役割の違いは? → 正解: コアノード: HDFS データを保存するノード(Spot にすると中断時にデータ損失のリスク)。タスクノード: 処理のみ(HDFSデータを持たない)。タスクノードをSpotにするのが安全なコスト削減手法

問題 38: AWS Glue のデータ品質ルール(DQDL)で IsUnique の評価は? → 正解: 指定した列(または列の組み合わせ)の値が一意(重複なし)かを評価。全行をスキャンして重複をチェック。大規模テーブルでは時間がかかる。重複行の割合に応じてパス/フェイルのしきい値を設定

問題 39: Amazon Redshift の「WLM(Workload Management)」の設定内容は? → 正解: クエリキューを定義して優先度・メモリ・同時実行数を制御。Auto WLM(自動管理): Redshift が動的に調整(推奨)。Manual WLM: 各キューのメモリ割合と同時実行数を手動設定。SuperUser キューは管理者専用

問題 40: Kinesis Data Streams の「シャード分割(Split Shard)」と「シャードマージ(Merge Shards)」の用途は? → 正解: Split Shard: 1つのシャードを2つに分割(スループット不足時のスケールアウト)。Merge Shards: 2つの隣接するシャードを1つに統合(スループット過剰時のコスト削減)。UpdateShardCount で一括変更も可能

問題 41: AWS Glue の「接続(Connection)」を定義する目的は? → 正解: データソース(JDBC接続のRDS/Redshift/MySQL等)への接続情報をGlueに登録。クローラーやETLジョブで使用。パスワードはSecrets Managerから動的取得。VPC/セキュリティグループの設定も含む

問題 42: Amazon Athena の「Prepared Statements」の用途は? → 正解: パラメータを使った再利用可能なクエリテンプレート。SQLインジェクション対策。繰り返し実行するクエリのコンパイルをスキップしてパフォーマンス向上。EXECUTE statement_name USING param1, param2で実行


学習戦略(DEA-C01)

試験の特徴

DEA-C01 の重点分野(ドメイン別):
  Domain 1: データ収集(18%)
    Kinesis/MSK/DMS/Transfer Family
    
  Domain 2: ストレージと変換(22%)
    S3/Glue/Lake Formation
    
  Domain 3: データ運用と変換(22%)
    EMR/Athena/Redshift/DMS
    
  Domain 4: データ分析(18%)
    QuickSight/Athena/Redshift
    
  Domain 5: データパイプラインの自動化(16%)
    Step Functions/EventBridge/Lambda
    
  Domain 6: データセキュリティとガバナンス(4%)
    Lake Formation/KMS/IAM

30日学習プラン

Week 1: データ収集とストレージ
  Day 1-2:  Kinesis(KDS/KDF/KDA/シャード設計)
  Day 3-4:  Amazon MSK(Kafka)とストリーミング設計
  Day 5-6:  S3(データレイク設計/フォーマット/パーティション)
  Day 7:    Lake Formation とデータガバナンス

Week 2: 変換と処理
  Day 8-9:  AWS Glue(ETL/クローラー/DataBrew/品質)
  Day 10-11: Amazon EMR(Spark/Hive/コスト最適化)
  Day 12-13: Amazon Redshift(設計/チューニング/Spectrum)
  Day 14:   Amazon Athena(コスト最適化/連合クエリ)

Week 3: 分析と移行
  Day 15-16: Amazon QuickSight(SPICE/可視化/ML Insights)
  Day 17-18: AWS DMS(Full Load/CDC/移行戦略)
  Day 19-20: AWS SCT と移行ベストプラクティス
  Day 21:   データパイプライン自動化(Step Functions/EventBridge)

Week 4: 仕上げ
  Day 22-24: 模擬試験(全問)
  Day 25-27: 弱点補強
  Day 28-30: 直前チェックリスト

試験直前チェックリスト(DEA-C01)

ストリーミング

  • [ ] Kinesis Data Streams のシャード設計(読み書き容量計算)を説明できる
  • [ ] KDS と SQS の使い分けを説明できる
  • [ ] 拡張ファンアウトと標準コンシューマーの使い分けを説明できる
  • [ ] Kinesis Data Firehose のバッファリングと変換を設定できる

バッチ処理とデータレイク

  • [ ] Glue のジョブブックマークで増分処理を実装できる
  • [ ] S3 のパーティショニング戦略を設計できる
  • [ ] Parquet/ORC フォーマットのメリットを説明できる
  • [ ] Lake Formation で列・行レベルのセキュリティを設定できる

データウェアハウスと分析

  • [ ] Redshift の分散スタイルとソートキーを最適化できる
  • [ ] Athena のコスト削減方法(フォーマット/パーティション/列絞り込み)を説明できる
  • [ ] DMS で Full Load + CDC の移行を設計できる
  • [ ] QuickSight の SPICE と直接クエリの使い分けを説明できる

DEA-C01 データエンジニアアソシエイト完全ガイド追加セクション完了


高度なデータエンジニアリングパターン

Apache Iceberg テーブルフォーマット

import boto3
import json

# Athena で Iceberg テーブルを作成
athena = boto3.client('athena', region_name='us-east-1')

def create_iceberg_table(database: str, table_name: str, s3_location: str) -> str:
    sql = f"""
    CREATE TABLE {database}.{table_name} (
        id          BIGINT,
        user_id     INTEGER,
        event_type  VARCHAR(50),
        event_time  TIMESTAMP,
        properties  MAP<VARCHAR, VARCHAR>,
        amount      DECIMAL(12,2)
    )
    LOCATION '{s3_location}'
    TBLPROPERTIES (
        'table_type' = 'ICEBERG',
        'format' = 'parquet',
        'write_compression' = 'snappy',
        'optimize_rewrite_delete_file_threshold' = '10'
    )
    """
    
    response = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={'OutputLocation': 's3://athena-results/iceberg/'}
    )
    return response['QueryExecutionId']

# Iceberg ACID操作
def iceberg_upsert_example(database: str, table: str) -> str:
    """MERGE INTO でupsert (CDC対応)"""
    sql = f"""
    MERGE INTO {database}.{table} t
    USING (
        SELECT 
            id,
            user_id,
            event_type,
            event_time,
            amount,
            'UPDATE' as operation
        FROM staging.events_staging
    ) s ON t.id = s.id
    WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
    WHEN MATCHED THEN UPDATE SET
        event_type = s.event_type,
        amount = s.amount
    WHEN NOT MATCHED THEN INSERT
        (id, user_id, event_type, event_time, amount)
        VALUES (s.id, s.user_id, s.event_type, s.event_time, s.amount)
    """
    
    response = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={'OutputLocation': 's3://athena-results/iceberg/'}
    )
    return response['QueryExecutionId']

# タイムトラベルクエリ
ICEBERG_TIME_TRAVEL = """
-- 過去の状態を参照
SELECT * FROM database.iceberg_table
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-01 00:00:00'

-- スナップショットIDで参照
SELECT * FROM database.iceberg_table
FOR VERSION AS OF 1234567890

-- 変更履歴
SELECT * FROM "database"."iceberg_table$history"

-- スナップショット一覧
SELECT * FROM "database"."iceberg_table$snapshots"
ORDER BY committed_at DESC
"""

# 最適化 (コンパクション)
ICEBERG_OPTIMIZE = """
-- 小さいファイルを結合
OPTIMIZE database.iceberg_table
REWRITE DATA USING BIN_PACK
WHERE event_time >= TIMESTAMP '2024-01-01';

-- DELETE tombstoneを削除
VACUUM database.iceberg_table;
"""

AWS Glue データ品質 (Glue Data Quality)

import boto3

glue = boto3.client('glue', region_name='us-east-1')

# データ品質ルールセットの作成
def create_dq_ruleset(
    ruleset_name: str,
    database: str,
    table: str
) -> dict:
    
    dqdl_rules = """
    Rules = [
        # 完全性チェック
        IsComplete "user_id",
        IsComplete "event_time",
        
        # 一意性チェック
        IsPrimaryKey "id",
        
        # 範囲チェック
        ColumnValues "amount" between 0 and 1000000,
        
        # 形式チェック
        ColumnValues "email" matches "^[a-zA-Z0-9._%+\\-]+@[a-zA-Z0-9.\\-]+\\.[a-zA-Z]{2,}{{CONTENT}}quot;,
        
        # 参照整合性チェック
        ColumnValues "status" in ["active", "inactive", "pending"],
        
        # 統計チェック
        ColumnStatistics "amount" {
            Mean between 100 and 5000,
            StandardDeviation between 10 and 1000
        },
        
        # 欠損値チェック
        ColumnCount = 8,
        RowCount > 1000,
        
        # 重複チェック
        Uniqueness "transaction_id" > 0.99,
        
        # 最新性チェック  
        ColumnValues "event_time" > (now() - 1 day)
    ]
    """
    
    return glue.create_data_quality_ruleset(
        Name=ruleset_name,
        Ruleset=dqdl_rules,
        TargetTable={
            'TableName': table,
            'DatabaseName': database
        },
        Description=f'Data quality rules for {database}.{table}'
    )

# データ品質評価ランの実行
def run_dq_evaluation(ruleset_name: str, role_arn: str) -> str:
    
    response = glue.start_data_quality_ruleset_evaluation_run(
        DataSource={
            'GlueTable': {
                'DatabaseName': 'analytics',
                'TableName': 'events'
            }
        },
        Role=role_arn,
        RulesetNames=[ruleset_name],
        AdditionalRunOptions={
            'CloudWatchMetricsEnabled': True,
            'ResultsS3Prefix': 's3://dq-results/'
        }
    )
    
    return response['RunId']

# ETLパイプラインでのデータ品質チェック (Glue Job内)
DQ_IN_ETL_JOB = """
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality

glueContext = GlueContext(sc)

# データ読み込み
datasource = glueContext.create_dynamic_frame.from_catalog(
    database='raw',
    table_name='events'
)

# データ品質評価
rules = '''
    Rules = [
        IsComplete "user_id",
        ColumnValues "amount" between 0 and 100000
    ]
'''

dq_results = EvaluateDataQuality.apply(
    frame=datasource,
    ruleset=rules,
    publishing_options={
        "dataQualityEvaluationContext": "events-dq",
        "enableDataQualityCloudWatchMetrics": True,
        "enableDataQualityResultsPublishing": True
    }
)

# DQ合格データのみ後続処理
passed_records = dq_results.select_from_graph_by_type(DynamicFrame)[0]
failed_records = dq_results.select_from_graph_by_type(DynamicFrame)[1]

# 合格データをS3に書き込み
glueContext.write_dynamic_frame.from_options(
    frame=passed_records,
    connection_type='s3',
    connection_options={'path': 's3://processed/events/'},
    format='parquet'
)

# 失敗データは隔離バケットへ
glueContext.write_dynamic_frame.from_options(
    frame=failed_records,
    connection_type='s3',
    connection_options={'path': 's3://quarantine/events/'},
    format='json'
)
"""

AWS Step Functions でのデータパイプライン

import boto3
import json

sf = boto3.client('stepfunctions', region_name='us-east-1')

# Step Functions ステートマシン定義
DATA_PIPELINE_STATE_MACHINE = {
    "Comment": "E2E Data Processing Pipeline",
    "StartAt": "ValidateInput",
    "States": {
        "ValidateInput": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Parameters": {
                "FunctionName": "validate-input",
                "Payload.{{CONTENT}}quot;: "{{CONTENT}}quot;
            },
            "Next": "CheckDataQuality",
            "Catch": [
                {
                    "ErrorEquals": ["ValidationError"],
                    "Next": "HandleValidationError"
                }
            ]
        },
        "CheckDataQuality": {
            "Type": "Task",
            "Resource": "arn:aws:states:::glue:startJobRun.sync",
            "Parameters": {
                "JobName": "data-quality-check",
                "Arguments": {
                    "--source_path.{{CONTENT}}quot;: "$.sourcePath",
                    "--ruleset_name": "production-rules"
                }
            },
            "Next": "DQResultCheck"
        },
        "DQResultCheck": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.dqScore",
                    "NumericGreaterThanEquals": 0.95,
                    "Next": "RunETL"
                }
            ],
            "Default": "NotifyDQFailure"
        },
        "RunETL": {
            "Type": "Task",
            "Resource": "arn:aws:states:::glue:startJobRun.sync",
            "Parameters": {
                "JobName": "main-etl-job",
                "Arguments": {
                    "--source_path.{{CONTENT}}quot;: "$.sourcePath",
                    "--target_path": "s3://processed/"
                }
            },
            "Next": "ParallelPostProcessing"
        },
        "ParallelPostProcessing": {
            "Type": "Parallel",
            "Branches": [
                {
                    "StartAt": "UpdateCatalog",
                    "States": {
                        "UpdateCatalog": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::lambda:invoke",
                            "Parameters": {"FunctionName": "update-glue-catalog"},
                            "End": True
                        }
                    }
                },
                {
                    "StartAt": "SendNotification",
                    "States": {
                        "SendNotification": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::sns:publish",
                            "Parameters": {
                                "TopicArn": "arn:aws:sns:us-east-1:123:pipeline-success",
                                "Message": "Pipeline completed successfully"
                            },
                            "End": True
                        }
                    }
                }
            ],
            "Next": "PipelineComplete"
        },
        "PipelineComplete": {
            "Type": "Succeed"
        },
        "NotifyDQFailure": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sns:publish",
            "Parameters": {
                "TopicArn": "arn:aws:sns:us-east-1:123:pipeline-alerts",
                "Message.{{CONTENT}}quot;: "States.Format('DQ Check failed. Score: {}', $.dqScore)"
            },
            "Next": "DQFailed"
        },
        "DQFailed": {
            "Type": "Fail",
            "Error": "DataQualityError",
            "Cause": "Data quality check failed"
        },
        "HandleValidationError": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Parameters": {"FunctionName": "handle-error"},
            "End": True
        }
    }
}

def create_data_pipeline(name: str) -> dict:
    return sf.create_state_machine(
        name=name,
        definition=json.dumps(DATA_PIPELINE_STATE_MACHINE),
        roleArn='arn:aws:iam::123456789:role/StepFunctionsRole',
        type='STANDARD'
    )

Amazon Redshift 高度なパターン

import boto3

redshift_data = boto3.client('redshift-data', region_name='us-east-1')

# Stored Procedure でのビジネスロジック
REDSHIFT_STORED_PROC = """
CREATE OR REPLACE PROCEDURE update_customer_tier(
    p_min_spend DECIMAL(12,2),
    p_tier_name VARCHAR(50)
)
LANGUAGE plpgsql
AS $
DECLARE
    v_count INTEGER;
BEGIN
    -- トランザクション内でのアップデート
    UPDATE customer_summary
    SET tier = p_tier_name,
        updated_at = GETDATE()
    WHERE total_spend >= p_min_spend
      AND tier != p_tier_name;
    
    GET DIAGNOSTICS v_count = ROW_COUNT;
    RAISE INFO '% customers updated to tier %', v_count, p_tier_name;
    
    -- 監査ログの記録
    INSERT INTO audit_log (operation, affected_rows, executed_at)
    VALUES ('update_customer_tier', v_count, GETDATE());
    
END;
$;

-- プロシージャの実行
CALL update_customer_tier(10000.00, 'Gold');
"""

# Redshift ML (Machine Learning in Redshift)
REDSHIFT_ML_EXAMPLE = """
-- Redshift MLモデルの作成 (SageMaker自動学習)
CREATE MODEL customer_churn_model
FROM customer_features
TARGET churned
FUNCTION predict_churn
IAM_ROLE 'arn:aws:iam::123456789:role/RedshiftMLRole'
AUTO OFF
SETTINGS (
    S3_BUCKET 'my-redshift-ml-bucket',
    MAX_RUNTIME 5400
);

-- モデルを使った予測
SELECT 
    customer_id,
    predict_churn(
        age, tenure_months, monthly_spend, 
        support_tickets, last_login_days
    ) as churn_prediction,
    predict_churn_probability(
        age, tenure_months, monthly_spend,
        support_tickets, last_login_days
    ) as churn_probability
FROM customer_features
WHERE churn_probability > 0.7;
"""

# Redshift のデータ共有 (Datashare)
def setup_redshift_datashare(
    producer_cluster_id: str,
    consumer_account_id: str,
    database: str
) -> None:
    
    redshift = boto3.client('redshift', region_name='us-east-1')
    
    # データ共有の作成
    redshift.create_data_share(
        DataShareName='analytics-share',
        ClusterIdentifier=producer_cluster_id
    )
    
    # データベースオブジェクトの追加
    redshift.add_data_share_objects(
        DataShareName='analytics-share',
        ClusterIdentifier=producer_cluster_id,
        DataShareObjects={
            'DatabaseName': database
        }
    )
    
    # コンシューマーアカウントへの許可
    redshift.authorize_data_share(
        DataShareArn=f'arn:aws:redshift:us-east-1:{producer_cluster_id}:datashare/analytics-share',
        ConsumerIdentifier=consumer_account_id
    )

ストリーミングデータパイプライン

# Kinesis Data Streams + Lambda + DynamoDB のパイプライン

import boto3
import json
import base64
from datetime import datetime
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('events')

def kinesis_stream_handler(event, context):
    """Kinesisストリームのリアルタイム処理"""
    
    processed = 0
    failed = 0
    batch_item_failures = []
    
    for record in event['Records']:
        try:
            # Base64デコード
            payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            data = json.loads(payload)
            
            # データ変換と検証
            processed_event = transform_event(data)
            
            # DynamoDBに書き込み
            table.put_item(Item=processed_event)
            
            processed += 1
        
        except Exception as e:
            print(f"Error processing record: {e}")
            # BisectBatchOnFunctionError設定時に使用
            batch_item_failures.append({
                "itemIdentifier": record['kinesis']['sequenceNumber']
            })
            failed += 1
    
    print(f"Processed: {processed}, Failed: {failed}")
    
    # 失敗レコードを返すことでKinesisは該当レコードからリトライ
    return {"batchItemFailures": batch_item_failures}

def transform_event(data: dict) -> dict:
    """イベントデータの変換・検証"""
    
    required_fields = ['event_id', 'user_id', 'event_type', 'timestamp']
    for field in required_fields:
        if field not in data:
            raise ValueError(f"Missing required field: {field}")
    
    return {
        'event_id': data['event_id'],
        'user_id': str(data['user_id']),
        'event_type': data['event_type'],
        'timestamp': data['timestamp'],
        'properties': {k: str(v) for k, v in data.get('properties', {}).items()},
        'amount': Decimal(str(data.get('amount', 0))),
        'processed_at': datetime.utcnow().isoformat(),
        'ttl': int((datetime.now().timestamp()) + 30 * 24 * 3600)
    }

# Kinesis Data Analytics (Apache Flink) でのストリーミング集計
FLINK_AGGREGATION_JOB = """
# PyFlink ジョブ例
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.table.window import Tumble

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Kinesisソース定義
t_env.execute_sql(\"\"\"
    CREATE TABLE events_source (
        event_id    STRING,
        user_id     STRING,
        event_type  STRING,
        amount      DOUBLE,
        event_time  TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kinesis',
        'stream' = 'events-stream',
        'aws.region' = 'us-east-1',
        'format' = 'json',
        'scan.stream.initpos' = 'LATEST'
    )
\"\"\")

# 5分間のタンブリングウィンドウ集計
t_env.execute_sql(\"\"\"
    CREATE TABLE events_aggregated (
        window_start  TIMESTAMP(3),
        window_end    TIMESTAMP(3),
        event_type    STRING,
        event_count   BIGINT,
        total_amount  DOUBLE,
        avg_amount    DOUBLE,
        PRIMARY KEY (window_start, event_type) NOT ENFORCED
    ) WITH (
        'connector' = 'kinesis',
        'stream' = 'aggregated-events',
        'aws.region' = 'us-east-1',
        'format' = 'json'
    )
\"\"\")

t_env.execute_sql(\"\"\"
    INSERT INTO events_aggregated
    SELECT
        TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
        TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
        event_type,
        COUNT(*) as event_count,
        SUM(amount) as total_amount,
        AVG(amount) as avg_amount
    FROM events_source
    GROUP BY 
        TUMBLE(event_time, INTERVAL '5' MINUTE),
        event_type
\"\"\")
"""

模擬試験 第1回 (65問)

Part 1: データ取り込み・処理 (問1-20)

問1: AWS Glue Job でJDBC接続からデータを読み込む際、大量レコードを効率的に並列処理するための設定は?

A) 1回のクエリで全データを取得 B) hashfield/hashexpression/hashpartitions を使用して並列読み込み C) EMRと組み合わせる必要がある D) メモリを増やすだけで解決

正解: B 解説: Glue JDBCの並列読み込みオプション: hashfield(数値型カラム名), hashexpression(カスタム式), hashpartitions(分割数)。これによりJDBC接続を複数のSparkパーティションで並列に読み込み処理を高速化できる。


問2: Amazon Kinesis Data Streams でデータの順序を保証するための設計は?

A) 複数のシャードに分散する B) 同一のパーティションキーを使用することで同一シャード内での順序が保証される C) Firehoseを使用する D) SQS FIFOキューと組み合わせる

正解: B 解説: KDSは同一シャード内でのシーケンス番号による順序保証。同じエンティティ(同一ユーザーのイベント等)は同じパーティションキーを使用することで同一シャードに配置され、送信順序が保証される。


問3: AWS DMS のCDC (Change Data Capture) で使用されるソースデータベースの要件は?

A) すべてのDBで同一設定 B) Oracle: supplemental logging有効化, MySQL/MariaDB: binlog_format=ROW, PostgreSQL: logical replication有効化 C) DDLイベントのみ取得 D) CDCは追加設定不要

正解: B 解説: CDCの前提設定: Oracle(ALTER DATABASE ADD SUPPLEMENTAL LOG DATA), MySQL(binlog_format=ROW, binlog_row_image=FULL), PostgreSQL(wal_level=logical). 設定不足だとDMSはDMLイベントを取得できない。


問4: S3 Event Notifications で新しいファイルのアップロードを検知してLambdaを起動する設定で注意すべき点は?

A) 即座に処理される B) S3 Event Notificationsは少なくとも1回配信(At-least-once)のため、Lambda関数は冪等性を持つ設計が必要 C) 通知の遅延は発生しない D) 1回だけ確実に処理される

正解: B 解説: S3 Event Notificationsはat-least-once配信で同一イベントが複数回配信される可能性がある。Lambdaは冪等性(べき等性)を保つ設計が必要(例: DynamoDBで処理済みIDを追跡、S3オブジェクトに処理済みタグ付け)。


問5: AWS Glue Elastic Views (Preview) とは何か?

A) Glueダッシュボード B) 複数のデータストア(S3, DynamoDB, Aurora等)のデータを統合した物理化ビューをSQLで定義 C) データレイクのビジュアライゼーション D) Glue Crawlerのビューモード

正解: B 解説: Glue Elastic Views(機能プレビュー)はSQLでソーステーブル(DynamoDB, S3, MySQL, PostgreSQL等)を統合した物理化ビューを定義し、変更を継続的に反映する。アプリケーションは統合されたビューにクエリを実行するだけでよい。


問6: Amazon S3でバケット間のデータを効率的に移動するコスト最適化の方法は?

A) 常にEC2を経由して転送 B) S3 Copy (同リージョン内は無料)、CRR (クロスリージョンは課金)、S3 Batch Operations で大量オブジェクトを一括コピー C) Transfer Familyを使用 D) DataSyncのみで対応

正解: B 解説: S3コスト: 同リージョン内のCopy/MoveはAPIコストのみ(転送無料)。クロスリージョンはデータ転送コスト発生。S3 Batch Operationsは既存オブジェクトの大量処理(コピー、タグ付け、Lambda呼び出し等)を一括管理するマネージドサービス。


問7: Amazon Redshift のUNLOADコマンドで大量データをS3に効率的にエクスポートするには?

A) 1ファイルのみに出力 B) PARALLEL ON(デフォルト)で複数ファイルに並列書き出し、PARQUET形式でサイズ削減 C) CSV形式のみサポート D) エクスポート前にVACUUMが必須

正解: B 解説: UNLOAD ('SELECT ...' TO 's3://bucket/prefix' IAM_ROLE '...' PARQUET PARALLEL ON) でS3に並列書き出し。PARALLEL ONで各スライスが別ファイルに出力して高速化。PARTITION BYオプションでHiveパーティション形式での出力も可能。


問8: AWS Glue でPython Shellジョブを使用するのに適したシナリオは?

A) 大規模なSparkジョブ B) 軽量なPythonスクリプト(小規模データ、AWS API呼び出し、メタデータ操作)でコスト削減 C) リアルタイムストリーミング処理 D) MLモデルのトレーニング

正解: B 解説: Python Shellジョブは単一コンテナでPython 3を実行し、DPUは0.0625(1/16 DPU)で最小コスト。小規模データ(< 数GB)のETL、Glue Catalogのメタデータ操作、AWS APIを使用したオーケストレーション処理に最適。


問9: Amazon EventBridge Pipes の主な機能は?

A) イベントのフィルタリングのみ B) ソース(SQS/Kinesis/DynamoDB Streams等)とターゲット(Lambda/Step Functions等)をフィルタリング・変換付きで直接接続するポイントツーポイントパイプライン C) CloudFormationのデプロイパイプライン D) データ変換専用サービス

正解: B 解説: EventBridge PipesはSQS、Kinesis、DynamoDB Streams、MQ等のソースからLambda、Step Functions、SQS、SNS、API Gateway等のターゲットへの接続をフィルタリング・変換(Lambda/EventBridge Input Transformation)付きで設定。カスタムコード不要でP2P統合を構築。


問10: Apache Spark の persist()cache() の違いは?

A) 機能は同じ B) cache()はMEMORY_ONLYでpersist()はストレージレベルを指定可能(MEMORY_ONLY/MEMORY_AND_DISK/DISK_ONLY等) C) cache()はより高速 D) persist()はSparkMLのみ使用

正解: B 解説: df.cache()df.persist(StorageLevel.MEMORY_ONLY) と等価。persist()でDISK_ONLYを指定するとディスクのみ使用でメモリを節約。複数アクションで使用するDataFrameのキャッシュは性能向上に重要。使用後はunpersist()でリソース解放。


問11: Amazon DynamoDB Streams と Kinesis Data Streams (KDS) のDynamoDB統合の違いは?

A) 機能は同じ B) DynamoDB Streams: 24時間保持、変更データのみ取得。DynamoDB→KDS (Kinesis Data Streams for DynamoDB): 最大1年保持、拡張ファンアウト対応、高スループット C) KDSはDynamoDBに非対応 D) DynamoDB StreamsはKinesisより常に優れている

正解: B 解説: DynamoDB Streams: 軽量でLambdaトリガーに最適(24h保持)。Kinesis Data Streams for DynamoDB: StreamSpecificationKINESISを選択し、KDSの全機能(長期保持、Enhanced Fan-Out、Consumer App)を使用可能。大量変更データの高スループット処理に適する。


問12: AWS Lake Formation の Column Masking 機能とは?

A) 列を完全に非表示にする B) 特定のユーザー/ロールに対して列の値をマスク(ハッシュ化、最初N文字のみ表示等)して表示する C) 列の暗号化 D) 列名の変更

正解: B 解説: Lake Formation Data Filters(行フィルタ + 列マスク)でデータの機密性を保ちながらデータアクセスを提供。メールアドレスの最初3文字のみ表示、SSNの最後4桁のみ表示等のマスキングルールをLF上で一元管理できる。


問13: Amazon S3 Access Points の主な利点は?

A) パフォーマンスの向上 B) バケットポリシーを複雑にせず、用途別/チーム別にアクセスポイントとポリシーを分離管理 C) コスト削減 D) データ暗号化の強化

正解: B 解説: S3 Access Pointsはバケットに対して複数のアクセスポイントを作成し、それぞれに独立したポリシーを設定できる。大きなバケットポリシーを分割管理し、チームごと/ユースケースごとに異なるアクセス制御を実現。VPCアクセスポイントでVPCからのみアクセスも可能。


問14: Glue ETLジョブで --enable-job-insights オプションを使用する利点は?

A) ジョブの自動最適化 B) SparkUIの改善とAQE、スプリット/マージ等の自動最適化が有効化される C) コストが削減される D) デバッグモードが有効になる

正解: B 解説: Glue Job Insightsは Spark AQE (Adaptive Query Execution)、動的フレームのrepartition最適化、不必要なシャッフルの削減等の自動最適化を有効化する。また、Glue CloudWatch UIでSparkの実行詳細が確認できる。


問15: Amazon OpenSearch Service のIndex State Management (ISM) の主な用途は?

A) インデックスの自動作成 B) インデックスのライフサイクル管理(Hot→Warm→Cold→Delete)の自動化 C) 検索クエリの最適化 D) レプリカの管理

正解: B 解説: ISMポリシーでインデックスのライフサイクルを自動管理: (1)Hotノード(高頻度アクセス) (2)UltraWarmノード(低頻度アクセス, S3ベース) (3)Cold Storage(さらに低頻度, 追加コスト削減) (4)自動削除。例: 7日→Warm移行, 30日→Cold, 90日→削除。


問16: Amazon Athena の UNLOAD コマンドとCTASの違いは?

A) 機能は同じ B) UNLOAD: クエリ結果を指定フォーマットでS3に保存(テーブル作成なし)。CTAS: クエリ結果から新テーブルを作成 C) UNLOADはCTASより遅い D) CTASのみがParquetをサポート

正解: B 解説: UNLOAD ('SELECT ...') TO 's3://...' WITH (format='PARQUET')はテーブルを作成せず結果をS3に書き出す。CTASは新テーブルを作成してクエリ結果を格納。一時的なデータ出力にはUNLOAD、再利用可能なテーブルにはCTASを使用。


問17: Apache Kafka でのComsumerGroup Rebalanceが発生する原因は?

A) プロデューサーのスケールアウト B) コンシューマーの追加/削除、コンシューマーのクラッシュ、パーティション数変更、セッションタイムアウト C) トピックの削除 D) ブローカーの再起動のみ

正解: B 解説: Consumer Group Rebalanceはコンシューマー数変更やセッションタイムアウトによりトリガー。Rebalance中は全コンシューマーが処理を停止(Stop-the-World)。最適化: max.poll.interval.msの調整、Incremental Cooperative Rebalancing(Kafka 2.4+)の使用で停止時間を削減。


問18: Amazon Redshift の VACUUM SORT ONLY vs VACUUM DELETE ONLY の違いは?

A) 機能は同じ B) SORT ONLY: ソートキー順序の再整列のみ(削除マークの物理削除なし)。DELETE ONLY: 削除マークの物理削除のみ(再ソートなし) C) SORT ONLYのみ推奨 D) どちらも全ブロックをスキャン

正解: B 解説: VACUUM SORT ONLY: 大量INSERT/COPYで順序が乱れたソートキーを再整列。VACUUM DELETE ONLY: 大量DELETE/UPDATEで発生した削除マークを物理削除してストレージ回収。VACUUM REINDEX: INTERLEAVEDソートキーの再インデックス。Redshift AutoVacuumで多くは自動実行。


問19: Amazon S3でZero-ETL統合を利用するサービスとは?

A) Glue JobとS3の統合 B) Aurora→Redshift Zero-ETL、DynamoDB→OpenSearch Zero-ETL など、データをETLなしでリアルタイムにRedshift/OpenSearchに反映 C) S3→DynamoDBの自動同期 D) Kinesisのみ対応

正解: B 解説: Zero-ETL統合: (1)Aurora MySQL/PostgreSQL→Redshift(変更データをETLなしでリアルタイム同期) (2)DynamoDB→OpenSearch(全文検索用) (3)Aurora→SageMaker(ML用)。Glueジョブ不要で複雑なETLパイプライン構築コストを削減。


問20: AWS Glue で Hudi テーブルフォーマットを使用する利点は?

A) Hudi は非推奨 B) ACID トランザクション、COW(Copy-on-Write) vs MOR(Merge-on-Read)のトレードオフ選択、インクリメンタルクエリ C) 読み取り専用 D) Iceberg と同一機能

正解: B 解説: Apache Hudi: COWモード(書き込み時マージ、読み取り高速)とMORモード(ログファイル、書き込み高速・読み取り時マージ)を選択可能。Upsert(キーベース更新)に最適化。Glue 3.0+でネイティブサポート。ストリーミングのIncrementalパターンに強い。


Part 2: 設計・アーキテクチャ (問21-45)

問21: データレイクのゾーン設計でRaw Zone(Bronze)に保存すべきデータの考え方は?

A) 加工済みデータのみ B) ソースシステムからの変換なしの生データ(バイトレベルで元データを保持), 再処理のための原本 C) 集計データ D) スキーマ最適化済みデータ

正解: B 解説: Raw/BronzeゾーンはETL処理失敗時や仕様変更時に生データから再処理できるように、ソースからの生データをそのまま保存する。S3の変更不可(Immutable)設定やオブジェクトロックでデータ損失を防止する。


問22: Amazon Athena でのパーティションプロジェクション設定で日時パーティションを使用するには?

A) Glue Crawlerで自動設定 B) テーブルプロパティに projection.enabled=true, projection.date.type=date/integer/injected 等を設定 C) パーティションプロジェクションは日時に対応していない D) Lambda関数での前処理が必要

正解: B 解説: Partition Projection設定例(日次パーティション): projection.enabled=true, projection.date.type=date, projection.date.range=2024-01-01,NOW, projection.date.format=yyyy-MM-dd, storage.location.template=s3://bucket/events/${date}/。Glue Catalogにパーティション登録不要。


問23: ストリーミングとバッチを統合するLambdaアーキテクチャの現代的な代替は?

A) Lambda Architecture (批判通りに廃止) B) Kappa Architecture: ストリーミングのみで統一し、再処理はKinesisの長期保持を使用 C) ETL Architecture D) どちらも同等

正解: B 解説: Kappa Architectureはストリーム処理(Kinesis/Kafka + Flink)のみで統一し、バッチとリアルタイムの2つのコードパスを持つLambda Architectureの複雑さを解消。Kinesisの最大365日保持を使用して過去データの再処理を行う。


問24: Amazon S3でHive形式のパーティション(year=2024/month=01/day=01)と非Hive形式(2024/01/01)のどちらを使用すべきか?

A) どちらでも同じ B) Hive形式を推奨: Glue CrawlerとAthenaが自動認識、Lake Formationとの統合が容易 C) 非Hive形式は常に非推奨 D) パーティションは使用しない

正解: B 解説: col=valueのHive形式パーティションはGlue Crawler、Athena、Sparkが自動認識。非Hive形式はMSCK REPAIR TABLEでAthenaが認識するが、カラム名が必要な場合はHive形式が必須。新規設計ではHive形式を推奨。


問25: Redshift ServerlessのBase RPU設定とは?

A) 最大RPU数 B) アイドル時でも確保するMinimum RPU数(低い値=低コスト、高い値=初回クエリのウォームアップ高速化) C) デフォルトのRPU数 D) 課金の基準単位

正解: B 解説: Base RPUはServerlessが最低限確保するRPU数(8-512)。低く設定すると長時間アイドル後の最初のクエリが遅い(コールドスタート)。頻繁に使用する場合は32-64RPUに設定してウォームアップを確保。アイドル10分後は自動的にRPUを減らす。


問26: データエンジニアリングでの「幂等性(Idempotency)」の重要性は?

A) 1回のみの実行保証 B) 同じ処理を複数回実行しても結果が変わらないことで、失敗時の安全な再試行とAt-least-once配信への対応を可能にする C) 処理の速度向上 D) コストの削減

正解: B 解説: 冪等性の実装: (1)ユニークIDによる重複排除(DynamoDB conditional write) (2)UPSERT/MERGE操作(Iceberg/Hudi) (3)S3のPUT-IF-ABSENT (4)SQSのdeduplication ID。Exactly-Once保証がないS3イベント/SQS At-least-once環境では必須の設計。


問27: AWS Glue でのエラーハンドリングと失敗レコードの管理方法は?

A) エラーはログに記録されるのみ B) DynamicFrameのerrorThreshold設定、stageThreshold/totalThreshold、エラーレコードを別パスに書き込み C) エラーは無視する D) 全エラーでジョブを停止

正解: B 解説: Glue DynamicFrameのget_resolve_choice + エラーフィールドの分離: エラー閾値(stageThreshold, totalThreshold)でジョブの許容エラー率を設定。filter(lambda r: r['error_field'] is None)で正常レコードと異常レコードを分離して別々に処理。


問28: Amazon Athena でのクエリ結果を別のS3バケットへのエクスポート自動化で最もシンプルな方法は?

A) Lambda関数で手動実装 B) EventBridge Scheduler → Athena Scheduled Query → S3 の定期実行パターン C) Glue JobからAthena APIを呼び出す D) Step Functionsのみで対応

正解: B 解説: Athena Scheduled Query機能で定期的なクエリ実行とS3への結果書き出しを設定できる。複雑なロジックが必要な場合はEventBridge → Step Functions → Athena API のパターン。


問29: EMR の --auto-terminate オプションを使用するメリットは?

A) 自動スケーリングのみ有効 B) 全ステップ完了後にクラスターを自動削除してコスト削減(一時的なバッチ処理に最適) C) 障害発生時のみ終了 D) クラスターの再起動が自動化

正解: B 解説: --auto-terminateはEMRクラスターのすべてのステップが完了した後に自動終了させる設定。コスト最適化のため、定期的なバッチ処理でクラスターを都度作成・削除する「Transient Cluster」パターンで使用。EMR Serverlessとも組み合わせ可能。


問30: Amazon Kinesis Data Streams の Enhanced Fan-Out を使用するべき条件は?

A) コンシューマーが1つの場合 B) 複数の独立したコンシューマーアプリが同一シャードを高スループット(2MB/sec/コンシューマー)でリアルタイム処理する必要がある場合 C) データ量が少ない場合 D) ファイル処理のみ

正解: B 解説: Enhanced Fan-Outは各コンシューマーに専用2MB/s/シャードの帯域を提供(通常は全コンシューマーで共有2MB/s/シャード)。低レイテンシ(~70ms)のプッシュ型。複数のリアルタイム分析ユースケース(ダッシュボード更新、リアルタイム検知等)を独立したコンシューマーで処理する場合に使用。


問31: AWS Glue でDelta Lake テーブルのZORDER(クラスタリング)を使用する目的は?

A) データの圧縮 B) よく使うフィルター条件のカラムを局所化してデータスキッピング効果を最大化 C) テーブルのバックアップ D) スキーマの強制

正解: B 解説: Delta LakeのZORDER BY(またはHudiのクラスタリング)は複数のカラムをS3ファイル内で局所化(co-localize)することで、WHERE col1 = X AND col2 = Yのフィルターで読み込むファイル数を最小化。Athena/Sparkでのスキャン量削減に効果的。


問32: Amazon S3 Intelligent-Tiering のArchive Access Tierへの自動移行条件は?

A) 90日間アクセスなし B) Archive Access Tier: 90日間アクセスなし, Deep Archive Access Tier: 180日間アクセスなし (Intelligent-TieringでArchive有効化が必要) C) 30日間アクセスなし D) サイズに基づく

正解: B 解説: S3 Intelligent-Tieringのデフォルト移行: 30日→Infrequent, 90日(オプション有効時)→Archive Instant, 90日(有効化必要)→Archive Access, 180日(有効化必要)→Deep Archive Access。Archive Access/Deep Archive Accessの有効化はバケットまたはオブジェクト設定で行う。


問33: データウェアハウスのスタースキーマでファクトテーブルとディメンションテーブルを正しく識別するには?

A) 行数が多い方がファクト B) ファクト: 定量的な指標(売上、数量、金額)を持つ中心テーブル。ディメンション: ファクトの属性・コンテキスト(日付、製品、顧客) C) ディメンションの方が常に大きい D) スタースキーマにディメンションはない

正解: B 解説: スタースキーマ: ファクトテーブル(大)が中心で数値指標とFK。ディメンションテーブル(小)が周囲で属性情報。Redshiftではファクト→DISTKEY(JOINキー)/SORTKEY(日付)、ディメンション→DISTSTYLE ALL(小さいテーブルを全ノードに複製)が基本設計。


問34: Amazon S3 Multi-Region Access Points の主な用途は?

A) コストの削減 B) グローバルに分散したS3バケットへのルーティングをGlobal Acceleratorで最適化し、レイテンシを最小化 C) バックアップ D) 暗号化の強化

正解: B 解説: S3 Multi-Region Access Points(MRAP)はGlobal Accelerator経由でユーザーを最も近いリージョンのS3バケットにルーティング。S3クロスリージョンレプリケーション(CRR)と組み合わせてグローバルに分散したデータへの最低レイテンシアクセスを実現。


問35: AWS Glue の Interactive Sessions でノートブックからGlueリソースにアクセスするメリットは?

A) 本番ジョブの高速実行 B) Jupyter Notebookから対話的にGlueリソース(Data Catalog, Spark環境)を操作でき、開発・デバッグが高速化 C) コストが最安 D) EMRと同じ機能

正解: B 解説: Glue Interactive Sessions(旧Glue Studio Notebook)はJupyter Notebookインターフェースから数秒でGlue Sparkセッションを起動して対話的に開発できる。本番ジョブと同じGlue環境・Data Catalogを使用し、本番移行が容易。使用分のみ課金(1分単位)。


問36: Apache Kafka の「コンパクション」(Log Compaction) とは何か?

A) ログファイルの圧縮 B) 各キーの最新値のみを保持して古い値を削除するログの最適化(イベントソーシングやCDCに使用) C) メッセージのサイズ削減 D) トピックの分割

正解: B 解説: Log Compactionを有効にしたトピックは、同一キーの古いメッセージを削除して最新値のみ保持する。データベースのように最終状態のスナップショットとして機能。cleanup.policy=compactで設定。変更されたレコードの最新状態をKafkaに保持するCDCユースケースに最適。


問37: Amazon Timestream のmagneticStoreWritesEnabled 設定の目的は?

A) S3への書き込みを有効化 B) 過去データ(メモリストアを過ぎた古いデータ)をマグネティックストアに書き込み可能にする C) 書き込みパフォーマンスの向上 D) データの暗号化

正解: B 解説: Timestreamのデフォルトはマグネティックストアへの書き込みを禁止(古いデータは削除)。magneticStoreWritesEnabled=trueを設定すると、過去データ(メモリストアの保持期間外)もマグネティックストアに書き込める。遅延到着データの対応に使用。


問38: AWS Glue ワークフローとAWS Step Functionsのどちらを使用すべきか?

A) 常にGlue Workflowを使用 B) Glue Workflowは純粋なGlueリソース(Job/Crawler/Trigger)の連携に。Step FunctionsはAWSサービスの横断的なオーケストレーション(Lambda/ECS/SageMaker等)に C) Step Functionsは廃止予定 D) どちらも同等

正解: B 解説: Glue Workflowはシンプルで Glue完結の ETLパイプラインに最適。Step FunctionsはGlue以外のサービス(Lambda, EMR, SageMaker, Athena等)を含む複雑なパイプラインに適しており、より詳細なエラーハンドリング、条件分岐、並列実行制御が可能。


問39: Amazon DynamoDB の TTL (Time To Live) 機能を使用したデータ管理パターンは?

A) TTLはコストが増加する B) アイテムに有効期限属性(Unixタイムスタンプ)を設定し、期限切れアイテムを自動削除してストレージコストを削減 C) TTLは即座に削除する D) TTLはDynamoDB Streamsと併用できない

正解: B 解説: DynamoDB TTLは設定したタイムスタンプ属性の値が過去になったアイテムを自動的に削除(通常数日以内)。セッションデータ、一時トークン、ログエントリ等の期限付きデータ管理に最適。TTL削除イベントはDynamoDB Streamsで取得可能。


問40: Kinesis Data Firehose でデータ変換Lambdaがタイムアウトした場合の動作は?

A) レコードが失敗してS3に保存されない B) 処理失敗としてS3の別パス(processing-failed/)に元のデータが保存される C) 自動的にリトライされ最終的に処理される D) Firehoseが停止する

正解: B 解説: Firehose変換Lambda失敗またはタイムアウト時: レコードはprocessing-failed/のS3パスに未変換のまま保存(元データは失われない)。numberOfRetries(デフォルト3)で変換失敗時のリトライ回数を設定可能。


問41: Apache Spark の Broadcast Variables の使用場面は?

A) 全てのジョブで使用 B) 読み取り専用の小さいデータ(ルックアップテーブル等)を各Executorにキャッシュして、Driver→Executorの繰り返しシリアライズを防ぐ C) 大きいテーブルのJOIN D) MLモデルの共有

正解: B 解説: Broadcast Variablesはsc.broadcast(lookup_dict)でドライバーから全Executorに読み取り専用データを効率的に配布。ルックアップテーブル、設定値、MLモデルのような小さいデータ(<数百MB)を各UDFやmap操作から参照する場合にシリアライズコストを削減。


問42: Amazon Redshift の Serverless vs Provisioned の自動スケーリングの違いは?

A) 両方とも同じ自動スケーリング B) Serverless: 自動RPUスケーリング(キャパシティ管理不要)。Provisioned: Concurrency Scalingで読み取りのみスケール(追加クラスターが自動起動) C) Provisionedのみ自動スケーリング D) どちらも手動スケーリング

正解: B 解説: Redshift Serverlessはクエリ負荷に応じてRPU(8-512)を自動スケーリング。ProvisionedのConcurrency Scalingは読み取りキューが多い時に一時的な追加クラスターをスピンアップ(1日1時間無料)。書き込みのスケーリングはProvisioned Cluster自体のサイズ変更が必要。


問43: AWS DataZone でデータ資産を「公開」(Publish) するとは?

A) S3バケットを公開設定にする B) データ資産をビジネスカタログに登録し、他のドメインのデータユーザーが発見・アクセス申請できるようにする C) インターネットに公開する D) CloudFrontで配信する

正解: B 解説: DataZoneの公開プロセス: データプロデューサー→ドメインカタログに資産登録→承認→ビジネスグロッサリーとタグ付け→データユーザーが検索・発見→アクセス申請→プロデューサーが承認→アクセス権付与。データガバナンスを保ちながらデータ民主化を実現。


問44: S3 の CRC32C チェックサムと MD5 ETag の違いは?

A) 機能は同じ B) CRC32Cはデータ整合性の迅速な検証に最適化。MD5 ETags はマルチパートアップロードでは全体ハッシュでなく各パートのMD5を結合した値 C) MD5の方が常に信頼性が高い D) CRC32Cは非推奨

正解: B 解説: S3のETag: 単一パートアップロードはMD5。マルチパートはMD5-N(N=パート数)の形式で全体のMD5でない。Checksum(CRC32C/CRC32/SHA-1/SHA-256)を指定すると正確なデータ整合性検証が可能。AWS SDKはデフォルトでCRC32Cチェックサムを計算・検証。


問45: Amazon Athena でのビューの更新(スキーマ変更後)が必要なケースは?

A) ビューは自動更新される B) 基になるテーブルのスキーマ変更後、ビューが参照するカラムが変更または削除された場合にビューの再作成が必要 C) ビューは永久に変更不要 D) Crawlerが自動で更新

正解: B 解説: Athenaのビューはクエリ時にComp(コンパイル)されるため基礎テーブルのスキーマ変更後は再コンパイルが必要。参照カラムが削除または型変更された場合はビューのCREATE OR REPLACE VIEWで再作成が必要。テーブル追加カラムはビュー再作成不要。


Part 3: セキュリティ・ガバナンス (問46-65)

問46: S3 Object Lock とVault Lock (Glacier)の共通目的は?

A) パフォーマンスの向上 B) Compliance/Governance Modeでオブジェクトの変更・削除を防止しWORM(Write Once Read Many)を実現 C) 暗号化の強化 D) コスト削減

正解: B 解説: S3 Object Lock(WORM): Compliance Mode(誰も削除できない)とGovernance Mode(特権ユーザーのみ削除可)。医療記録、金融記録など規制要件のあるデータの改ざん防止に使用。Glacier Vault LockはGlacierでのWORM実装。


問47: Amazon Macie でカスタムデータ識別子を作成するには?

A) Macieの設定変更のみ B) 正規表現パターンとキーワード/最大一致数でカスタム機密データパターンを定義 C) Lambda関数での実装が必要 D) サードパーティツールが必要

正解: B 解説: Macieカスタムデータ識別子: 正規表現 + キーワード(正規表現マッチ近傍に存在する必要がある単語) + 除外パターン + 最大一致距離 でカスタム機密データパターンを定義。社員IDや社内コード等の組織固有の機密データを検出できる。


問48: AWS Glue Data Catalog のリソースポリシーとLake Formationの権限の優先順位は?

A) Lake Formationが常に優先 B) Lake Formation有効時: LF権限 + IAMポリシーの両方で許可が必要。Glue Resource Policyは使用されない C) Glue Resource Policyのみ有効 D) IAMポリシーのみ使用される

正解: B 解説: Lake Formation有効化後: (1)IAMポリシーでGlue/AthenaのAWS APIへのアクセスを許可 (2)Lake FormationでDatabaseやTableへのデータアクセス権限を付与。両方の許可が必要。Glue Data CatalogのResource Policyは通常Lake Formation有効時には使用しない。


問49: Amazon S3 Server Access Logging と CloudTrail Data Events の違いは?

A) 同じ情報を記録 B) Server Access Logs: バケット・オブジェクトレベルのHTTPリクエスト詳細(無料)。CloudTrail Data Events: S3のPutObject/GetObject等のAPI操作(有料, 追加設定必要) C) CloudTrailのみが推奨 D) Server Access Logsの方が詳細

正解: B 解説: Server Access Logs: S3内に保存(別バケット推奨)、HTTPアクセスの詳細(Requester IP、Request URI、HTTP Status等)。CloudTrail Data Events: AWS APIレベルの操作(誰がどのAPIを使ったか)、Athenaでの分析が推奨。コンプライアンス要件に応じて両方設定が必要なケースもある。


問50: Amazon EMR でのセキュリティ設定でKerberos認証を有効にする目的は?

A) パフォーマンスの向上 B) ユーザー認証を強化してHDFS/YARN/Sparkリソースへの不正アクセスを防止 C) コストの削減 D) Sparkのみに適用

正解: B 解説: EMR Kerberos設定: EMRクラスターにKDC(Key Distribution Center)を設定し、HadoopエコシステムのサービスとユーザーをKerberosで認証。Active Directory統合も可能。企業のセキュリティポリシーに応じたHadoop認証の強化に使用。


問51: Amazon Redshift のデータベース監査ログを有効にするには?

A) CloudTrailで自動記録 B) Redshift Audit Logging(S3バケット指定)またはCloudWatch Logsでユーザーアクティビティ・接続ログを収集 C) VPC Flow Logsのみで対応 D) 別途購入が必要

正解: B 解説: Redshift監査ログ種類: (1)Connection Log(接続/切断) (2)User Log(ユーザー変更) (3)User Activity Log(全SQLクエリ、有効化必要: enable_user_activity_logging=trueパラメータ)。S3またはCloudWatch Logsに保存。Athenaでのクエリ分析が推奨。


問52: AWS Glue Sensitive Data Detection でPIIを検出するには?

A) Glue Crawlerで自動検出 B) Glue Sensitive Data Detectionタスクでジョブを実行し、エンティティ種別(SSN, EMAIL等)を定義してS3/カタログのスキャン C) AWS Macieのみで対応 D) カスタムLambdaが必要

正解: B 解説: Glue Sensitive Data DetectionはGlue Studio ETLジョブでデータ内のPII(SSN、Email、Phone、Credit Card等)を検出。DETECT_PII_ENTITIES変換を使用してカラム単位でPII存在の有無とエンティティ種別をマークし、Macieと連携して詳細スキャンを補完。


問53: Amazon DynamoDB テーブルの暗号化キー管理のオプションは?

A) DynamoDBは暗号化できない B) AWS所有キー(デフォルト、無料)、AWSマネージドキー(aws/dynamodb)、カスタムマネージドキー(CMK)の3種類から選択 C) CMKのみ使用可能 D) S3と同じ設定が使用される

正解: B 解説: DynamoDB暗号化オプション: (1)AWS-owned key(デフォルト、追加費用なし、最もシンプル) (2)AWS-managed key(aws/dynamodbKMSキー、CloudTrailでキー使用履歴確認可能) (3)Customer-managed key(独自KMS CMK、最大の制御、追加KMSコスト)。規制要件に応じて選択。


問54: Amazon Athena のワークグループの機能で保存クエリ(Saved Queries)の用途は?

A) クエリの自動実行 B) よく使用するクエリをワークグループ内で保存・共有し、チームのクエリ標準化を促進 C) パフォーマンスの向上 D) コストの削減

正解: B 解説: Saved Queriesはワークグループ内で共有可能なクエリライブラリ。チームで標準的なレポートクエリを共有、新メンバーのオンボーディング、定期的なアドホック分析クエリの管理に使用。名前・説明を付けて整理可能。


問55: AWS Glue Crawler が検出するパーティションの情報をGlue Data Catalogに保存する際の注意事項は?

A) 全パーティションが即座に登録される B) --crawler-maximum-partitions-per-table デフォルト200万パーティション制限。大量パーティション時はパーティションインデックスを設定するかHive Partition Projectionを使用 C) パーティション数に制限はない D) Crawlerはパーティションを認識しない

正解: B 解説: Glue Data Catalogのパーティション上限: テーブルあたり最大2億(SLA)。ただしCrawler実行ごとの追加上限や性能劣化に注意。大量パーティション(100万+)の場合: (1)パーティションインデックス設定でフィルタリング高速化 (2)パーティションプロジェクションでCatalogを回避。


問56: データレイクセキュリティのベストプラクティスでゾーン別のアクセス制御設計は?

A) 全ゾーンを同一アクセス制御 B) Raw(保護最強)→Silver(中程度)→Gold(広くアクセス可能)の階層でアクセス制御を設計 C) Goldゾーンのみ制限 D) Lake Formationは一律設定

正解: B 解説: ゾーン別アクセス: Raw(生データ): ETLロールのみ、Silver(クレンジング済み): データエンジニア+分析エンジン、Gold(ビジネスアグリゲーション): データアナリスト+BIツール。行/列レベルの細粒度制御はGoldゾーンで適用。


問57: Amazon S3 でバケットポリシーとACL(Access Control Lists)の推奨設定は?

A) ACLを優先使用 B) 新規バケットではACL無効化(Object Ownership=BucketOwner Enforced)してバケットポリシーのみで管理を推奨 C) ACLとバケットポリシーを両方使用 D) バケットポリシーは廃止予定

正解: B 解説: AWSのベストプラクティスはACLを無効化(Object Ownership=BucketOwnerEnforced)してバケットポリシーでのみアクセス制御を行うこと。ACLはレガシーで複雑で混乱を招く。IAMポリシー+バケットポリシーの組み合わせが推奨。


問58: AWS Glue Schema Registry の主な機能は?

A) GlueのIAM権限管理 B) Apache Avro/JSON/Protobufスキーマの中央管理と進化(後方互換/前方互換/完全互換)の強制 C) データ品質の管理 D) ETLジョブのスキーマ検証のみ

正解: B 解説: Glue Schema Registryはスキーマの中央リポジトリ。プロデューサー(Kafka/Kinesis/MSK)がスキーマをレジストリに登録し、コンシューマーがスキーマを参照してデシリアライズ。Schema Evolution(互換性モード: BACKWARD/FORWARD/FULL/NONE)でスキーマ変更を制御。


問59: Amazon Athena のクエリ失敗のデバッグ方法は?

A) AWS Supportに問い合わせるのみ B) Query Execution Historyで失敗クエリのError MessageとQuery IDを確認、CloudWatch Logsでのログ分析 C) Cloudwatch Metricsのみ確認 D) デバッグは不可能

正解: B 解説: Athenaクエリデバッグ: (1)コンソールのQuery History→失敗クエリのエラーメッセージ確認 (2)EXPLAIN文でクエリプラン確認 (3)CloudWatch MetricsのProcessedBytes確認 (4)CTAS→EXPLAINで段階的なデバッグ。よくある失敗: スキーマ不一致、パーミッション不足、S3パス誤り。


問60: AWS のデータパイプライン設計でのべき等性(Idempotency)を保証するS3への書き込みパターンは?

A) 上書き書き込みのみ B) 一時ディレクトリに書き込んでからアトミックにリネーム/コピー、またはSuffix付きファイル名+後からのコンパクション C) DynamoDBへの書き込みで管理 D) べき等性の保証は不可能

正解: B 解説: S3の冪等書き込み: (1)一時prefix(_tmp/)に書き込み→完了後に最終prefix(data/)にCopy→_tmp/削除のアトミックパターン (2)PartitionKeyのオーバーライト(同一パーティションの再実行で上書き) (3)IcebergのMERGE INTOでupsert。


問61: Amazon Redshift でのロールベースアクセス制御 (RBAC) の設定は?

A) IAMポリシーのみで制御 B) Redshift内でROLEを作成してスキーマ/テーブル/プロシージャに権限を付与し、ユーザーにロールを割り当て C) RedshiftにはRBACがない D) Lake Formationのみで制御

正解: B 解説: Redshift RBAC: CREATE ROLE analyst; GRANT SELECT ON SCHEMA analytics TO ROLE analyst; GRANT ROLE analyst TO user1;。デフォルトロール(sys:superuser, sys:dba, sys:operator等)も提供。Lake FormationはGlueカタログ経由のアクセスを制御。


問62: Amazon DynamoDB の DAX (DynamoDB Accelerator) を使用するべきケースは?

A) 全てのDynamoDBワークロード B) 読み取りが多い(read-heavy)ワークロードでマイクロ秒のキャッシュが必要な場合 C) 書き込み最適化 D) コスト削減

正解: B 解説: DAXはインメモリキャッシュでDynamoDBのms読み取りをマイクロ秒(<1ms)に改善。ゲームリーダーボード、リアルタイムダッシュボード、大量読み取りAPIに最適。書き込みスケーリングにはDynamoDB Auto ScalingまたはOn-demand capacityを使用。


問63: Glue ETLジョブのコスト最適化で有効な設定は?

A) 常に最大DPUで実行 B) AutoScaling有効化、G.025Xの最小Workertype、Job Bookmarkで処理済みデータをスキップ、Glue3.0/4.0の使用 C) EMRに移行のみが有効 D) ジョブを増やすことで並列化

正解: B 解説: Glue ETL最適化: (1)--enable-auto-scalingで処理量に応じたWorker数自動調整 (2)軽量処理はG.025X(0.25DPU/Worker) (3)Job Bookmarkで新データのみ処理 (4)Glue 4.0(Spark 3.3ベース)の性能改善活用 (5)不要なcache/persist削除 (6)PushDown Predicateで S3スキャン量削減。


問64: Amazon Kinesis Data Firehose でデータをOpenSearchに送信する際のバッファリング設定の影響は?

A) バッファリングはリアルタイム性に影響しない B) Buffer Size小・Buffer Interval短=高リアルタイム性だが多数の小インデックス操作。Buffer Size大・Interval長=バッチ効率向上だがレイテンシ増加 C) OpenSearchは常に即時受信 D) バッファリング設定はS3のみに影響

正解: B 解説: Firehose→OpenSearch: Buffer Size(1-100MB)とBuffer Interval(60-900秒)のバランスで調整。リアルタイム要件が高い場合は1MB/60秒。大量データのバッチ効率を重視する場合は100MB/900秒。OpenSearchのインデックス最適化(index.refresh_interval)も合わせて調整。


問65: データエンジニアリングパイプラインの障害検知と自動復旧のベスト設計は?

A) 手動でのモニタリングのみ B) CloudWatch Alarms + SNS通知 + EventBridge → Step Functions / Lambda での自動リトライとエスカレーション C) AWS Supportへの通知のみ D) ログ確認後に手動対応

正解: B 解説: パイプライン自動復旧: (1)CloudWatch/Glueジョブメトリクスでアラーム設定 (2)SNSで通知 (3)EventBridgeで失敗イベント検知→Lambda/Step Functionsで自動リトライ (4)リトライ上限超過でエスカレーション(PagerDuty/ServiceNow) (5)Step Functionsで再実行。


付録: DEA-C01 学習チェックリスト

【DEA-C01 必須暗記事項】

■ データ取り込み
  ✓ Kinesis Data Streams vs Firehose の使い分け
  ✓ Kinesis Enhanced Fan-Out (専用2MB/s帯域)
  ✓ DMS CDC ソース設定 (Oracle/MySQL/PostgreSQL)
  ✓ Glue JDBC 並列読み込み (hashfield/hashpartitions)
  ✓ S3 Event Notifications (at-least-once, 冪等性必要)

■ データストレージ・フォーマット
  ✓ Iceberg: ACID, タイムトラベル, スキーマ進化, MERGE INTO
  ✓ Hudi: COW vs MOR, Upsert最適化
  ✓ Delta Lake: ZORDER, Vaccum, OPTIMIZE
  ✓ Parquet: カラム指向, スキーマ, 圧縮

■ データ処理
  ✓ Glue Job Types (ETL/Python Shell/Ray/Streaming)
  ✓ Glue Data Quality (DQDL ルール構文)
  ✓ EMR Auto Scaling + Spot Instance
  ✓ Step Functions + Glue オーケストレーション
  ✓ Spark AQE, Broadcast Join, Skew対策

■ データウェアハウス
  ✓ Redshift DISTKEY/SORTKEY 選択基準
  ✓ Redshift Serverless vs Provisioned
  ✓ Redshift Zero-ETL (Aurora/DynamoDB→Redshift)
  ✓ Redshift ML (SageMakerAuto-ML統合)
  ✓ Redshift Datashare (クロスアカウント)

■ セキュリティ・ガバナンス
  ✓ Lake Formation TBAC, Row Filter, Column Masking
  ✓ S3 Object Lock (Compliance/Governance Mode)
  ✓ Glue Schema Registry (BACKWARD/FORWARD互換)
  ✓ Macie (PII検出), Custom Data Identifier
  ✓ Amazon DataZone (データカタログ+ガバナンス)

■ コスト最適化
  ✓ Athena: Parquet+パーティション+プロジェクション
  ✓ Glue: AutoScaling, JobBookmark, G.025X
  ✓ S3 Intelligent-Tiering, Lifecycle
  ✓ EMR: Spot + Auto Scaling + Serverless

【試験戦略】
1. ストリーミング vs バッチ: リアルタイム要件で判断
2. オープンテーブルフォーマット: ACID/タイムトラベル → Iceberg, Upsert → Hudi
3. コスト問題: スキャン量削減(Parquet/Partition)が最重要
4. セキュリティ: Lake Formation (行/列)+ S3 Object Lock (WORM)
5. データ統合: Zero-ETL > ETL (可能な場合)

DEA-C01 (AWS Certified Data Engineer Associate) 試験対策ガイド完成 作成日: 2026-04


第6章: データ品質・ガバナンス・セキュリティ

6.1 AWS Glue Data Quality 詳細

import boto3
import json

glue_client = boto3.client('glue', region_name='ap-northeast-1')

# データ品質ルールセット定義
def create_data_quality_ruleset():
    ruleset_dqdl = """
    Rules = [
        # 完全性チェック
        IsComplete "customer_id",
        IsComplete "order_date",
        IsComplete "amount",
        
        # 一意性チェック
        IsUnique "order_id",
        
        # 値範囲チェック
        ColumnValues "amount" between 0 and 1000000,
        ColumnValues "quantity" > 0,
        
        # 参照整合性
        ColumnValues "status" in ["pending", "processing", "completed", "cancelled"],
        
        # カスタムSQL
        CustomSQL "select count(*) from primary where amount < 0" = 0,
        
        # 統計的チェック
        ColumnStatisticsAreNonZero "amount",
        Mean "amount" between 100 and 10000,
        StandardDeviation "amount" < 50000,
        
        # パターンチェック
        ColumnValues "email" matches "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}{{CONTENT}}quot;,
        ColumnValues "phone" matches "^[0-9]{10,11}{{CONTENT}}quot;
    ]
    """
    
    response = glue_client.create_data_quality_ruleset(
        Name='OrdersDataQualityRules',
        Description='Orders table data quality rules',
        Ruleset=ruleset_dqdl,
        TargetTable={
            'DatabaseName': 'sales_db',
            'TableName': 'orders'
        }
    )
    return response

# データ品質評価の実行
def run_data_quality_evaluation(ruleset_name, database, table):
    response = glue_client.start_data_quality_ruleset_evaluation_run(
        DataSource={
            'GlueTable': {
                'DatabaseName': database,
                'TableName': table
            }
        },
        Role='arn:aws:iam::123456789012:role/GlueDataQualityRole',
        NumberOfWorkers=5,
        Timeout=300,
        RulesetNames=[ruleset_name]
    )
    return response['RunId']

# 評価結果の取得
def get_evaluation_results(run_id):
    response = glue_client.get_data_quality_ruleset_evaluation_run(
        RunId=run_id
    )
    
    if response['Status'] == 'SUCCEEDED':
        results = response['ResultIds']
        for result_id in results:
            result = glue_client.get_data_quality_result(ResultId=result_id)
            print(f"Overall Score: {result['Score']}")
            
            for rule_result in result['RuleResults']:
                status = "PASS" if rule_result['Result'] == 'PASS' else "FAIL"
                print(f"  {status}: {rule_result['Name']}")
                if rule_result['Result'] == 'FAIL':
                    print(f"    Description: {rule_result.get('EvaluationMessage', 'N/A')}")
    
    return response

# データ品質スコア閾値によるパイプライン制御
def evaluate_with_threshold(run_id, min_score=0.8):
    import time
    
    while True:
        response = glue_client.get_data_quality_ruleset_evaluation_run(RunId=run_id)
        status = response['Status']
        
        if status in ['SUCCEEDED', 'FAILED', 'STOPPED']:
            break
        time.sleep(30)
    
    if status != 'SUCCEEDED':
        raise Exception(f"Data quality run failed with status: {status}")
    
    result_id = response['ResultIds'][0]
    result = glue_client.get_data_quality_result(ResultId=result_id)
    score = result['Score']
    
    if score < min_score:
        # SNS通知
        sns = boto3.client('sns')
        sns.publish(
            TopicArn='arn:aws:sns:ap-northeast-1:123456789012:DataQualityAlerts',
            Subject='Data Quality Check Failed',
            Message=f'Data quality score {score:.2%} is below threshold {min_score:.2%}'
        )
        raise Exception(f"Data quality score {score:.2%} below minimum {min_score:.2%}")
    
    return score

6.2 Lake Formation 詳細設定

import boto3

lf_client = boto3.client('lakeformation', region_name='ap-northeast-1')

# Lake Formation データレイク設定
def setup_lake_formation():
    # データレイク管理者設定
    lf_client.put_data_lake_settings(
        DataLakeSettings={
            'DataLakeAdmins': [
                {'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789012:role/DataLakeAdmin'}
            ],
            'CreateDatabaseDefaultPermissions': [],
            'CreateTableDefaultPermissions': [],
            'TrustedResourceOwners': ['123456789012'],
            'AllowExternalDataFiltering': True,
            'ExternalDataFilteringAllowList': [
                {'DataLakePrincipalIdentifier': '123456789012'}
            ]
        }
    )

# テーブルベースのアクセスコントロール (TBAC)
def grant_table_permissions(principal_arn, database, table, permissions):
    """
    permissions: ['SELECT', 'INSERT', 'DELETE', 'DESCRIBE', 'ALTER', 'DROP', 'ALL']
    """
    response = lf_client.grant_permissions(
        Principal={'DataLakePrincipalIdentifier': principal_arn},
        Resource={
            'Table': {
                'DatabaseName': database,
                'Name': table
            }
        },
        Permissions=permissions,
        PermissionsWithGrantOption=[]
    )
    return response

# 列レベルセキュリティ
def grant_column_permissions(principal_arn, database, table, columns):
    response = lf_client.grant_permissions(
        Principal={'DataLakePrincipalIdentifier': principal_arn},
        Resource={
            'TableWithColumns': {
                'DatabaseName': database,
                'Name': table,
                'ColumnNames': columns
            }
        },
        Permissions=['SELECT'],
        PermissionsWithGrantOption=[]
    )
    return response

# 行フィルター (Row-level filtering)
def create_data_filter(database, table, filter_name, row_filter_expression):
    response = lf_client.create_data_cells_filter(
        TableData={
            'TableCatalogId': '123456789012',
            'DatabaseName': database,
            'TableName': table,
            'Name': filter_name,
            'RowFilter': {
                'FilterExpression': row_filter_expression
            }
        }
    )
    return response

# 使用例: 地域別データアクセス制御
def setup_regional_access():
    # 東京リージョンのデータのみアクセス可能
    create_data_filter(
        database='sales_db',
        table='orders',
        filter_name='tokyo_region_filter',
        row_filter_expression="region = 'ap-northeast-1'"
    )
    
    # 東京チームにフィルター付きアクセスを付与
    lf_client.grant_permissions(
        Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789012:role/TokyoTeam'},
        Resource={
            'DataCellsFilter': {
                'TableCatalogId': '123456789012',
                'DatabaseName': 'sales_db',
                'TableName': 'orders',
                'Name': 'tokyo_region_filter'
            }
        },
        Permissions=['SELECT'],
        PermissionsWithGrantOption=[]
    )

# LF-Tags (属性ベースのアクセス制御)
def setup_lf_tags():
    # タグの作成
    lf_client.create_lf_tag(
        TagKey='DataSensitivity',
        TagValues=['Public', 'Internal', 'Confidential', 'Restricted']
    )
    
    lf_client.create_lf_tag(
        TagKey='Department',
        TagValues=['Finance', 'Marketing', 'Engineering', 'HR']
    )
    
    # テーブルにタグを付与
    lf_client.add_lf_tags_to_resource(
        Resource={
            'Table': {
                'DatabaseName': 'hr_db',
                'Name': 'employee_salaries'
            }
        },
        LFTags=[
            {'TagKey': 'DataSensitivity', 'TagValues': ['Restricted']},
            {'TagKey': 'Department', 'TagValues': ['HR']}
        ]
    )
    
    # タグベースでの権限付与
    lf_client.grant_permissions(
        Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789012:role/HRManager'},
        Resource={
            'LFTagPolicy': {
                'ResourceType': 'TABLE',
                'Expression': [
                    {'TagKey': 'Department', 'TagValues': ['HR']},
                    {'TagKey': 'DataSensitivity', 'TagValues': ['Restricted']}
                ]
            }
        },
        Permissions=['SELECT', 'DESCRIBE'],
        PermissionsWithGrantOption=[]
    )

6.3 AWS Macie によるデータ発見・分類

import boto3

macie_client = boto3.client('macie2', region_name='ap-northeast-1')

# 分類ジョブの作成
def create_classification_job(bucket_name, job_name):
    response = macie_client.create_classification_job(
        jobType='SCHEDULED',
        name=job_name,
        s3JobDefinition={
            'bucketDefinitions': [
                {
                    'accountId': '123456789012',
                    'buckets': [bucket_name]
                }
            ]
        },
        scheduleFrequency={
            'weeklySchedule': {
                'dayOfWeek': 'MONDAY'
            }
        },
        samplingPercentage=100,
        managedDataIdentifierSelector='ALL'
    )
    return response['jobId']

# 検出結果の取得
def get_sensitive_data_findings():
    response = macie_client.list_findings(
        findingCriteria={
            'criterion': {
                'type': {
                    'eq': ['SensitiveData:S3Object/Personal']
                }
            }
        },
        maxResults=100
    )
    
    finding_ids = response['findingIds']
    
    if finding_ids:
        details = macie_client.get_findings(findingIds=finding_ids)
        return details['findings']
    
    return []

第7章: データエンジニアリング設計パターン

7.1 Lambda Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Lambda Architecture                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Data Sources                                               │
│  ┌──────┐  ┌──────────┐  ┌──────────┐                      │
│  │ IoT  │  │  App DB  │  │   API    │                      │
│  └──┬───┘  └────┬─────┘  └────┬─────┘                      │
│     │           │              │                             │
│     └───────────┴──────────────┘                            │
│                 │                                            │
│          ┌──────┴───────┐                                   │
│          │    Kinesis   │                                   │
│          │  Data Stream │                                   │
│          └──────┬───────┘                                   │
│                 │                                            │
│    ┌────────────┼────────────────┐                          │
│    │            │                │                          │
│    ▼            ▼                ▼                          │
│  Batch       Speed            Serving                       │
│  Layer        Layer            Layer                        │
│  (S3+Glue)  (KDF+Lambda)    (Redshift/                      │
│  (EMR)      (Flink)          Athena)                        │
│                                                             │
│  ・高レイテンシ  ・低レイテンシ   ・クエリ統合                 │
│  ・完全データ    ・直近データ     ・Batch+Speed               │
└─────────────────────────────────────────────────────────────┘

7.2 Kappa Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Kappa Architecture                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Data Sources ──→ Kinesis Data Streams ──→ Processing       │
│                                              │              │
│                   ┌──────────────────────────┘              │
│                   │                                         │
│             ┌─────▼────────┐                                │
│             │ Apache Flink  │ (MSK / KDA)                   │
│             │ Stream Proc   │                                │
│             └─────┬────────┘                                │
│                   │                                         │
│          ┌────────▼──────────┐                              │
│          │  Serving Layer    │                               │
│          │  (Redshift/       │                               │
│          │   OpenSearch/     │                               │
│          │   DynamoDB)       │                               │
│          └───────────────────┘                              │
│                                                             │
│  利点: シンプル、リアルタイム、再処理可能                       │
│  欠点: 長期保持コスト、複雑な状態管理                          │
└─────────────────────────────────────────────────────────────┘

7.3 データメッシュ設計

# データメッシュ: ドメイン別データプロダクト設計
import boto3
import json

class DataMeshProduct:
    """データプロダクトの基本クラス"""
    
    def __init__(self, domain, product_name, owner_account):
        self.domain = domain
        self.product_name = product_name
        self.owner_account = owner_account
        self.glue_client = boto3.client('glue')
        self.lf_client = boto3.client('lakeformation')
    
    def register_product(self, s3_location, schema):
        """データプロダクトをGlue Data Catalogに登録"""
        
        # データベース作成
        try:
            self.glue_client.create_database(
                DatabaseInput={
                    'Name': f'{self.domain}_{self.product_name}',
                    'Description': f'Data product: {self.domain}/{self.product_name}',
                    'LocationUri': s3_location
                }
            )
        except self.glue_client.exceptions.AlreadyExistsException:
            pass
        
        # テーブル作成
        self.glue_client.create_table(
            DatabaseName=f'{self.domain}_{self.product_name}',
            TableInput={
                'Name': self.product_name,
                'StorageDescriptor': {
                    'Columns': schema,
                    'Location': s3_location,
                    'InputFormat': 'org.apache.iceberg.mr.mapred.IcebergInputFormat',
                    'OutputFormat': 'org.apache.iceberg.mr.mapred.IcebergOutputFormat',
                    'SerdeInfo': {
                        'SerializationLibrary': 'org.apache.iceberg.mr.hive.HiveIcebergSerDe'
                    }
                },
                'TableType': 'EXTERNAL_TABLE',
                'Parameters': {
                    'table_type': 'ICEBERG',
                    'metadata_location': f'{s3_location}/metadata/v1.metadata.json'
                }
            }
        )
    
    def share_with_domain(self, consumer_account, permissions=['SELECT']):
        """他ドメインにデータプロダクトを共有"""
        
        # RAM リソース共有
        ram_client = boto3.client('ram')
        ram_client.create_resource_share(
            name=f'{self.domain}-{self.product_name}-share',
            resourceArns=[
                f'arn:aws:glue:ap-northeast-1:{self.owner_account}:table/{self.domain}_{self.product_name}/{self.product_name}'
            ],
            principals=[consumer_account],
            allowExternalPrincipals=True
        )
        
        # Lake Formation権限付与
        self.lf_client.grant_permissions(
            Principal={'DataLakePrincipalIdentifier': consumer_account},
            Resource={
                'Table': {
                    'CatalogId': self.owner_account,
                    'DatabaseName': f'{self.domain}_{self.product_name}',
                    'Name': self.product_name
                }
            },
            Permissions=permissions
        )

# 使用例
order_product = DataMeshProduct(
    domain='sales',
    product_name='orders',
    owner_account='111122223333'
)

schema = [
    {'Name': 'order_id', 'Type': 'string'},
    {'Name': 'customer_id', 'Type': 'string'},
    {'Name': 'amount', 'Type': 'decimal(10,2)'},
    {'Name': 'order_date', 'Type': 'date'}
]

order_product.register_product(
    s3_location='s3://sales-data-lake/products/orders/',
    schema=schema
)

order_product.share_with_domain(
    consumer_account='444455556666',  # Analyticsドメイン
    permissions=['SELECT', 'DESCRIBE']
)

7.4 ELT vs ETL パターン

┌─────────────────────────────────────────────────────────────┐
│                    ETL vs ELT 比較                           │
├──────────────────────┬──────────────────────────────────────┤
│         ETL          │              ELT                     │
├──────────────────────┼──────────────────────────────────────┤
│ Extract→Transform    │ Extract→Load→Transform               │
│ →Load                │                                      │
├──────────────────────┼──────────────────────────────────────┤
│ Glue ETL Job         │ Glue→S3→Redshift COPY               │
│ (変換してからDWH)      │ (DWHで変換)                         │
├──────────────────────┼──────────────────────────────────────┤
│ 複雑な変換向き         │ 大量データ向き                        │
│ プライバシー保護可能   │ 柔軟なスキーマ変更可能                 │
│ DWH負荷軽減           │ 変換コストをDWHで吸収                  │
└──────────────────────┴──────────────────────────────────────┘

ELT パターン (Redshift):
-- Step 1: S3からRedshiftにCOPY(生データ)
COPY raw_orders
FROM 's3://data-lake/raw/orders/'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftS3Role'
FORMAT AS PARQUET;

-- Step 2: Redshift内で変換・集計
CREATE TABLE analytics.daily_sales AS
SELECT
    DATE_TRUNC('day', order_date) AS sale_date,
    product_category,
    region,
    COUNT(*) AS order_count,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM raw_orders
WHERE status = 'completed'
GROUP BY 1, 2, 3;

-- Step 3: マテリアライズドビューで高速クエリ
CREATE MATERIALIZED VIEW mv_daily_sales AS
SELECT * FROM analytics.daily_sales;

REFRESH MATERIALIZED VIEW mv_daily_sales;

第8章: 高度なストリーミング処理

# KDA Flink アプリケーション管理
import boto3
import json

kda_client = boto3.client('kinesisanalyticsv2', region_name='ap-northeast-1')

def create_flink_application(app_name, s3_bucket, s3_key):
    """KDA Flinkアプリケーション作成"""
    
    response = kda_client.create_application(
        ApplicationName=app_name,
        RuntimeEnvironment='FLINK-1_15',
        ServiceExecutionRole='arn:aws:iam::123456789012:role/KDAFlinkRole',
        ApplicationConfiguration={
            'ApplicationCodeConfiguration': {
                'CodeContent': {
                    'S3ContentLocation': {
                        'BucketARN': f'arn:aws:s3:::{s3_bucket}',
                        'FileKey': s3_key
                    }
                },
                'CodeContentType': 'ZIPFILE'
            },
            'FlinkApplicationConfiguration': {
                'CheckpointConfiguration': {
                    'ConfigurationType': 'CUSTOM',
                    'CheckpointingEnabled': True,
                    'CheckpointInterval': 60000,  # 60秒
                    'MinPauseBetweenCheckpoints': 5000
                },
                'MonitoringConfiguration': {
                    'ConfigurationType': 'CUSTOM',
                    'MetricsLevel': 'APPLICATION',
                    'LogLevel': 'INFO'
                },
                'ParallelismConfiguration': {
                    'ConfigurationType': 'CUSTOM',
                    'Parallelism': 4,
                    'ParallelismPerKPU': 1,
                    'AutoScalingEnabled': True
                }
            },
            'EnvironmentProperties': {
                'PropertyGroups': [
                    {
                        'PropertyGroupId': 'FlinkApplicationProperties',
                        'PropertyMap': {
                            'input.stream.name': 'orders-stream',
                            'output.stream.name': 'processed-orders-stream',
                            'aws.region': 'ap-northeast-1'
                        }
                    }
                ]
            }
        }
    )
    return response

# Flink SQL ジョブ (KDA Studio)
FLINK_SQL_JOB = """
-- Kinesisソーステーブル
CREATE TABLE orders_stream (
    order_id VARCHAR,
    customer_id VARCHAR,
    product_id VARCHAR,
    quantity INT,
    amount DECIMAL(10,2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kinesis',
    'stream' = 'orders-stream',
    'aws.region' = 'ap-northeast-1',
    'format' = 'json',
    'scan.stream.initpos' = 'LATEST'
);

-- 集計結果の出力先
CREATE TABLE sales_summary_sink (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    product_id VARCHAR,
    total_quantity INT,
    total_revenue DECIMAL(15,2),
    order_count BIGINT
) WITH (
    'connector' = 'kinesis',
    'stream' = 'sales-summary-stream',
    'aws.region' = 'ap-northeast-1',
    'format' = 'json'
);

-- 1分間のタンブリングウィンドウ集計
INSERT INTO sales_summary_sink
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
    TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
    product_id,
    SUM(quantity) AS total_quantity,
    SUM(amount) AS total_revenue,
    COUNT(*) AS order_count
FROM orders_stream
GROUP BY 
    TUMBLE(event_time, INTERVAL '1' MINUTE),
    product_id;
"""

8.2 Amazon MSK (Managed Streaming for Apache Kafka)

import boto3
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import ssl

# MSKクラスター情報取得
kafka_client = boto3.client('kafka', region_name='ap-northeast-1')

def get_msk_bootstrap_brokers(cluster_arn):
    response = kafka_client.get_bootstrap_brokers(ClusterArn=cluster_arn)
    return {
        'plaintext': response.get('BootstrapBrokerString'),
        'tls': response.get('BootstrapBrokerStringTls'),
        'sasl_scram': response.get('BootstrapBrokerStringSaslScram')
    }

# SASL/SCRAM認証でのプロデューサー
def create_msk_producer(bootstrap_servers, username, password):
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=username,
        sasl_plain_password=password,
        ssl_check_hostname=False,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8') if k else None,
        # パフォーマンス設定
        batch_size=65536,
        linger_ms=5,
        compression_type='snappy',
        acks='all',  # 全レプリカに書き込み確認
        retries=3,
        max_in_flight_requests_per_connection=5
    )
    return producer

# コンシューマーグループ設定
def create_msk_consumer(bootstrap_servers, group_id, topics):
    consumer = KafkaConsumer(
        *topics,
        bootstrap_servers=bootstrap_servers,
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512',
        group_id=group_id,
        auto_offset_reset='earliest',
        enable_auto_commit=False,  # 手動コミット
        value_deserializer=lambda v: json.loads(v.decode('utf-8')),
        max_poll_records=500,
        session_timeout_ms=30000,
        heartbeat_interval_ms=10000
    )
    return consumer

# トピック管理
def create_kafka_topics(bootstrap_servers, topics_config):
    admin = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers,
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512'
    )
    
    new_topics = [
        NewTopic(
            name=topic['name'],
            num_partitions=topic['partitions'],
            replication_factor=topic['replication'],
            topic_configs={
                'retention.ms': str(topic.get('retention_ms', 604800000)),  # 7日
                'cleanup.policy': topic.get('cleanup_policy', 'delete'),
                'compression.type': 'snappy'
            }
        )
        for topic in topics_config
    ]
    
    admin.create_topics(new_topics=new_topics, validate_only=False)
    admin.close()

# MSK Serverless
def create_msk_serverless_cluster(cluster_name, vpc_id, subnet_ids):
    response = kafka_client.create_cluster_v2(
        ClusterName=cluster_name,
        Serverless={
            'VpcConfigs': [
                {
                    'SubnetIds': subnet_ids,
                    'SecurityGroupIds': ['sg-12345678']
                }
            ],
            'ClientAuthentication': {
                'Sasl': {
                    'Iam': {
                        'Enabled': True
                    }
                }
            }
        }
    )
    return response['ClusterArn']

8.3 Kinesis Data Firehose 詳細設定

import boto3
import json

firehose_client = boto3.client('firehose', region_name='ap-northeast-1')

# 動的パーティショニング設定
def create_firehose_dynamic_partitioning(stream_name, s3_bucket):
    response = firehose_client.create_delivery_stream(
        DeliveryStreamName=stream_name,
        DeliveryStreamType='DirectPut',
        ExtendedS3DestinationConfiguration={
            'RoleARN': 'arn:aws:iam::123456789012:role/FirehoseRole',
            'BucketARN': f'arn:aws:s3:::{s3_bucket}',
            # 動的パーティショニング
            'Prefix': 'data/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/region=!{partitionKeyFromQuery:region}/',
            'ErrorOutputPrefix': 'errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/',
            'BufferingHints': {
                'SizeInMBs': 128,
                'IntervalInSeconds': 300
            },
            'CompressionFormat': 'PARQUET',
            'DataFormatConversionConfiguration': {
                'SchemaConfiguration': {
                    'RoleARN': 'arn:aws:iam::123456789012:role/FirehoseRole',
                    'DatabaseName': 'sales_db',
                    'TableName': 'orders',
                    'Region': 'ap-northeast-1',
                    'VersionId': 'LATEST'
                },
                'InputFormatConfiguration': {
                    'Deserializer': {
                        'OpenXJsonSerDe': {}
                    }
                },
                'OutputFormatConfiguration': {
                    'Serializer': {
                        'ParquetSerDe': {
                            'Compression': 'SNAPPY',
                            'EnableDictionaryCompression': True
                        }
                    }
                },
                'Enabled': True
            },
            # 動的パーティショニング有効化
            'DynamicPartitioningConfiguration': {
                'Enabled': True,
                'RetryOptions': {
                    'DurationInSeconds': 300
                }
            },
            'ProcessingConfiguration': {
                'Enabled': True,
                'Processors': [
                    {
                        'Type': 'MetadataExtraction',
                        'Parameters': [
                            {
                                'ParameterName': 'MetadataExtractionQuery',
                                'ParameterValue': '{year: .event_time[0:4], month: .event_time[5:7], day: .event_time[8:10], region: .region}'
                            },
                            {
                                'ParameterName': 'JsonParsingEngine',
                                'ParameterValue': 'JQ-1.6'
                            }
                        ]
                    },
                    {
                        'Type': 'Lambda',
                        'Parameters': [
                            {
                                'ParameterName': 'LambdaArn',
                                'ParameterValue': 'arn:aws:lambda:ap-northeast-1:123456789012:function:firehose-transform'
                            },
                            {
                                'ParameterName': 'NumberOfRetries',
                                'ParameterValue': '3'
                            }
                        ]
                    }
                ]
            }
        }
    )
    return response

第9章: パフォーマンス最適化

9.1 Redshift パフォーマンスチューニング

-- クエリパフォーマンス分析
SELECT
    q.query,
    q.label,
    q.starttime,
    q.endtime,
    DATEDIFF(seconds, q.starttime, q.endtime) AS duration_sec,
    q.aborted,
    q.rows,
    q.bytes / 1024 / 1024 AS bytes_mb
FROM stl_query q
WHERE q.userid > 1
    AND q.starttime >= DATEADD(day, -1, GETDATE())
ORDER BY duration_sec DESC
LIMIT 20;

-- テーブル統計情報更新
ANALYZE sales.orders;
ANALYZE COMPRESSION sales.orders;

-- バキューム処理
VACUUM SORT ONLY sales.orders;
VACUUM DELETE ONLY sales.orders;
VACUUM REINDEX sales.orders;

-- ディストリビューションキー最適化
-- ジョインが多いカラムをDISTKEYに設定
ALTER TABLE sales.orders
ALTER DISTSTYLE KEY DISTKEY(customer_id);

-- ソートキー最適化
ALTER TABLE sales.orders
ALTER COMPOUND SORTKEY(order_date, region);

-- Redshift Advisor推奨クエリ
SELECT
    schemaname,
    tablename,
    recommendation_text,
    estimated_disk_utilization_delta,
    estimated_execution_time_delta
FROM svv_table_info
JOIN pg_catalog.svl_advisory
    USING (tableid)
ORDER BY estimated_execution_time_delta DESC;

-- WLM (Workload Management) 設定確認
SELECT
    service_class,
    num_query_tasks,
    query_working_mem,
    max_execution_time,
    user_group_wild_card,
    query_group_wild_card
FROM stv_wlm_service_class_config
WHERE service_class > 4
ORDER BY service_class;

-- 同時実行スケーリング確認
SELECT
    service_class,
    num_query_tasks,
    query_working_mem
FROM STV_WLM_SERVICE_CLASS_CONFIG
WHERE service_class = 100;  -- concurrency scaling class

9.2 Athena クエリ最適化

-- パーティションプルーニング活用
-- BAD: フルスキャン
SELECT * FROM sales.orders
WHERE YEAR(order_date) = 2024 AND MONTH(order_date) = 1;

-- GOOD: パーティションカラムを直接使用
SELECT * FROM sales.orders
WHERE year = '2024' AND month = '01';

-- パーティション射影(Partition Projection)設定
-- テーブルプロパティで設定
-- 'projection.enabled' = 'true'
-- 'projection.year.type' = 'integer'
-- 'projection.year.range' = '2020,2030'
-- 'projection.month.type' = 'integer'
-- 'projection.month.range' = '1,12'
-- 'projection.month.digits' = '2'

-- CTAS でParquet変換(クエリ高速化)
CREATE TABLE optimized_orders
WITH (
    format = 'PARQUET',
    parquet_compression = 'SNAPPY',
    partitioned_by = ARRAY['year', 'month'],
    external_location = 's3://data-lake/optimized/orders/'
)
AS SELECT
    order_id,
    customer_id,
    product_id,
    amount,
    status,
    YEAR(order_date) AS year,
    MONTH(order_date) AS month
FROM raw_orders;

-- Athena クエリ結果再利用(QueryResultsReuseEnabled)
-- コスト削減: 同じクエリの結果を60分キャッシュ

-- 統計情報収集
MSCK REPAIR TABLE sales.orders;

9.3 EMR パフォーマンス設定

import boto3

emr_client = boto3.client('emr', region_name='ap-northeast-1')

# 高性能EMRクラスター設定
def create_optimized_emr_cluster(cluster_name):
    response = emr_client.run_job_flow(
        Name=cluster_name,
        ReleaseLabel='emr-6.15.0',
        Applications=[
            {'Name': 'Spark'},
            {'Name': 'Hive'},
            {'Name': 'Livy'}
        ],
        Instances={
            'MasterInstanceType': 'm5.xlarge',
            'SlaveInstanceType': 'r5.4xlarge',
            'InstanceCount': 10,
            'Ec2KeyName': 'my-key-pair',
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
            'Ec2SubnetId': 'subnet-12345678'
        },
        Configurations=[
            {
                'Classification': 'spark-defaults',
                'Properties': {
                    'spark.sql.adaptive.enabled': 'true',
                    'spark.sql.adaptive.coalescePartitions.enabled': 'true',
                    'spark.sql.adaptive.skewJoin.enabled': 'true',
                    'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
                    'spark.sql.parquet.filterPushdown': 'true',
                    'spark.sql.orc.filterPushdown': 'true',
                    'spark.dynamicAllocation.enabled': 'true',
                    'spark.dynamicAllocation.minExecutors': '2',
                    'spark.dynamicAllocation.maxExecutors': '20',
                    'spark.memory.fraction': '0.8',
                    'spark.memory.storageFraction': '0.2'
                }
            },
            {
                'Classification': 'spark-hive-site',
                'Properties': {
                    'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
                }
            },
            {
                'Classification': 'yarn-site',
                'Properties': {
                    'yarn.nodemanager.vmem-check-enabled': 'false',
                    'yarn.scheduler.maximum-allocation-mb': '65536'
                }
            }
        ],
        JobFlowRole='EMR_EC2_DefaultRole',
        ServiceRole='EMR_DefaultRole',
        AutoScalingRole='EMR_AutoScaling_DefaultRole',
        LogUri='s3://my-emr-logs/elasticmapreduce/',
        VisibleToAllUsers=True,
        BootstrapActions=[
            {
                'Name': 'Install Python packages',
                'ScriptBootstrapAction': {
                    'Path': 's3://my-scripts/bootstrap.sh'
                }
            }
        ]
    )
    return response['JobFlowId']

# Spark 最適化コード
SPARK_OPTIMIZATION_SCRIPT = """
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \\
    .appName("OptimizedETL") \\
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \\
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \\
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://data-lake/warehouse/") \\
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \\
    .getOrCreate()

# ブロードキャストジョインで小テーブルを効率処理
customers_df = spark.table("glue_catalog.sales.customers")
orders_df = spark.table("glue_catalog.sales.orders")

# 小テーブルをブロードキャスト
result = orders_df.join(
    broadcast(customers_df),
    orders_df.customer_id == customers_df.customer_id
)

# パーティション数最適化
# ファイルサイズ128MB目標: データ量/128MB
optimal_partitions = max(1, int(orders_df.rdd.getNumPartitions() / 4))
result = result.repartition(optimal_partitions, "order_date")

# コラム選択で不要データ排除
result = result.select(
    "order_id", "customer_id", "customer_name",
    "amount", "order_date", "status"
)

# Iceberg テーブルに書き込み
result.writeTo("glue_catalog.analytics.orders_enriched") \\
    .option("write.format.default", "parquet") \\
    .option("write.parquet.compression-codec", "snappy") \\
    .tableProperty("write.metadata.compression-codec", "gzip") \\
    .partitionedBy(months("order_date")) \\
    .createOrReplace()

# コンパクション実行
spark.sql("CALL glue_catalog.system.rewrite_data_files('analytics.orders_enriched')")
"""

模擬試験 セット2(20問)

問題1 Apache Icebergのタイムトラベル機能について、正しい説明はどれですか?

A) スナップショットIDを使用して過去のデータを参照できる B) タイムトラベルにはS3バージョニングが必要 C) タイムトラベルクエリはリアルタイムデータのみに使用可能 D) タイムトラベルはParquetフォーマット専用の機能

正解: A 解説: Apache IcebergはスナップショットIDまたはタイムスタンプを指定してSQL経由で過去データを参照できます。SELECT * FROM table FOR VERSION AS OF snapshot_id または FOR TIMESTAMP AS OF 'timestamp' 構文を使用。S3バージョニング不要。


問題2 Glue Data Qualityで実行評価の結果スコアが0.75で、閾値を0.80と設定した場合の適切な対応は?

A) スコアを無視してパイプラインを続行する B) パイプラインを停止し、SNSでアラートを送信する C) データを自動修正して再評価する D) スコアをS3に保存して後で分析する

正解: B 解説: データ品質スコアが閾値を下回った場合は、データ品質の問題を通知し、パイプラインを停止して不正データの伝播を防ぐことが重要。SNS/EventBridgeによるアラートが推奨パターン。


問題3 Lake FormationのLF-Tags(属性ベースアクセス制御)の主な利点は?

A) テーブルレベルの権限のみサポートする B) タグを使用してデータとIAMプリンシパル両方に属性付与し、ポリシーを一元管理できる C) LF-TagsはS3バケットポリシーの代替として使用する D) LF-TagsはGlue ETLジョブの実行権限のみ制御できる

正解: B 解説: LF-Tagsにより、テーブル/カラムとIAMプリンシパルの両方にタグを付与し、タグの一致条件でアクセスを制御。数百テーブルがあるデータレイクでも中央集権的なポリシー管理が可能。


問題4 Redshift Spectrumを使用してS3の外部テーブルをクエリする場合、最もコスト効率が高いファイル形式は?

A) CSV(Gzip圧縮なし) B) JSON C) Parquet(列指向、Snappy圧縮) D) Avro

正解: C 解説: Redshift Spectrumはスキャンデータ1TB単位で課金。Parquet(列指向)は必要な列のみスキャンし、Snappy圧縮でデータ量を削減。CSVに比べて90%以上のコスト削減が可能。


問題5 Amazon MSKでConsumer Lagが急増している場合の最初の対処法は?

A) MSKブローカー数を増やす B) Kafkaトピックの保持期間を延長する C) Consumer Groupのパーティション数とコンシューマー数のバランスを確認する D) プロデューサーのbatch.sizeを増やす

正解: C 解説: Consumer Lagの主な原因はコンシューマーの処理能力不足。まずパーティション数とコンシューマースレッド数を確認。コンシューマー数がパーティション数を超えると遊休コンシューマーが発生。コンシューマー数 ≤ パーティション数が原則。


問題6 Kinesis Data Firehoseの動的パーティショニング機能の目的は?

A) データを複数のAWSリージョンに分散する B) レコードの内容に基づいてS3のプレフィックス(フォルダー)を動的に決定する C) Firehoseストリームを複数のAWSアカウントで共有する D) リアルタイムでデータを暗号化する

正解: B 解説: 動的パーティショニングにより、レコード内のフィールド値(例: region、date)に基づいてS3プレフィックスを自動決定。!{partitionKeyFromQuery:field} 構文でJQクエリによるメタデータ抽出と組み合わせて使用。


問題7 AWSデータレイクにおける「Bronze-Silver-Gold」アーキテクチャの説明として正しいのは?

A) 3つの異なるAWSリージョンにデータを複製する B) Bronze=生データ、Silver=クリーニング済み、Gold=ビジネス集計という段階的なデータ変換 C) Bronze=S3、Silver=Redshift、Gold=DynamoDBへのデータ移動 D) データのバックアップ世代管理(1日前、1週間前、1ヶ月前)

正解: B 解説: メダリオンアーキテクチャ(Delta Lake/Lakehouse)のパターン。Bronze層(S3 raw/): 生データそのまま保持。Silver層(S3 refined/): データ品質チェック・型変換・正規化済み。Gold層(S3 curated/): ビジネスロジック適用・集計済みデータ。各層をApache IcebergやDelta Lake形式で管理。


問題8 AWS Glue Crawlerが新しいパーティションを自動検出するための設定は?

A) CrawlerをSCHEDULEDモードで定期実行するか、S3イベント通知でトリガーする B) Glue Crawlerは新しいパーティションを自動検出できない C) AWS DMS変更データキャプチャを使用する D) AthenaのMSCK REPAIR TABLEを手動実行する

正解: A 解説: Glue CrawlerをScheduledで定期実行(例: 毎時)するか、EventBridgeルール+S3 PutObjectイベントでCrawlerを自動起動する。またはMSCK REPAIR TABLEをAthenaで手動実行、あるいはGlue APIのCreatePartitionで直接追加する方法もある。


問題9 Amazon OpenSearch Serviceでの大量データインジェスト時の推奨設定は?

A) シャード数を1に設定してシンプルにする B) リフレッシュ間隔を-1(無効)にしてバルクインサート後に手動リフレッシュする C) レプリカシャード数を10に増やす D) すべてのフィールドにdynamic mappingを使用する

正解: B 解説: 大量インジェスト時は"refresh_interval": “-1” でリフレッシュ無効化し、インジェスト完了後にPOST /index/_refreshを実行。また"number_of_replicas": 0にしてインジェスト後に元に戻す。これでインジェスト速度が大幅向上。


問題10 AWS DMS(Database Migration Service)でCDC(Change Data Capture)を使用する際の前提条件は?

A) ソースデータベースはAWS RDSのみ対応 B) ソースDBでバイナリログ(MySQL)またはREDO LOG(Oracle)の有効化が必要 C) CDCはターゲットがS3の場合のみ動作する D) DMSはデータ移行のみで、継続的レプリケーションは不可能

正解: B 解説: CDCはソースDBの変更ログを読み取る。MySQL/Aurora: binlog_format=ROWが必須。Oracle: Supplemental Loggingの有効化が必要。PostgreSQL: wal_level=logicalが必要。オンプレミスDBでも要件を満たせばCDC使用可能。


問題11 Step Functionsのデータパイプラインで、Glue ETLジョブが失敗した場合に自動リトライと通知を実装する最も適切な方法は?

A) Lambda関数でループ処理を実装する B) Step FunctionsのRetryとCatchをタスク定義に追加する C) CloudWatch Eventsで5分ごとにジョブ状態を確認する D) AWS Batchのジョブキューを使用する

正解: B 解説: Step FunctionsのRetryフィールドでMaxAttempts、IntervalSeconds、BackoffRateを設定。Catchフィールドで失敗時にSNS通知ステートやエラーハンドリングステートに遷移を定義。これによりOrchestration層でリトライと通知を一元管理。


問題12 Redshift ML(Machine Learning)でモデルを作成する際のバックエンドサービスは?

A) Amazon Rekognition B) Amazon SageMaker AutoPilot C) Amazon Comprehend D) Amazon Forecast

正解: B 解説: Redshift MLはSageMaker AutoPilotをバックエンドに使用。CREATE MODELコマンドでSageMaker AutoPilotジョブが自動実行され、最適なモデルが選択される。モデル作成後はRedshiftのSQLからml_function名(features)として呼び出し可能。


問題13 Amazon QuickSightのSPICE(Super-fast Parallel In-memory Calculation Engine)の主な利点は?

A) データを暗号化してS3に保存する B) データをメモリにキャッシュして高速クエリを実現し、データソースへの負荷を軽減する C) リアルタイムでデータを変換して可視化する D) 複数のAWSアカウントのデータを自動集計する

正解: B 解説: SPICEはQuickSightのIn-memoryエンジン。データソース(RDS、Redshift等)からデータをインポートしてキャッシュ。クエリ高速化とデータソースへの接続負荷軽減が主な利点。容量に制限あり(ユーザー当たり10GB)。直接クエリモードよりSPICEが高速だがリアルタイム性は低下。


問題14 AWS Glue DataBrewでPII(個人識別情報)を自動検出してマスキングする場合の手順は?

A) Lambda関数でカスタム変換ロジックを実装する B) DataBrewのデータプロファイリングジョブでPIIを検出し、レシピステップでSUBSTITUTE/CRYPTOGRAPHICHASHを適用 C) Macieの検出結果をDataBrewに連携してマスキングする D) DataBrewはPII検出機能を持たない

正解: B 解説: DataBrewのプロファイリングジョブはPII自動検出機能を持つ。検出後、レシピにMASK_CUSTOM、SUBSTITUTE、CRYPTOGRAPHIC_HASHなどのステップを追加してマスキング処理を定義。プロダクション環境でのデータ仮名化に活用。


問題15 大規模データレイクでのスキーマ進化(Schema Evolution)を最もサポートするファイルフォーマットは?

A) CSV B) JSON(行ごとにスキーマが異なる可能性がある) C) Apache Iceberg(列の追加・削除・リネームをメタデータ変更のみで対応) D) Parquet(スキーマ変更は新ファイル作成が必要)

正解: C 解説: Apache Icebergはスキーマ進化を標準サポート。列の追加・削除・リネーム・型昇格(intからlong等)をメタデータ変更のみで実現し、既存データの書き換えは不要。ParquetはファイルにスキーマをEmbedするが、既存ファイルの変更は不要。IcebergはスキーマとパーティションスキームをMetadataレイヤーで管理。


問題16 Kinesis Data Streams のシャードのホットパーティション問題を解決する最適な方法は?

A) シャード数を増やす B) プロデューサー側でランダムなパーティションキーを使用してシャードに均等分散する C) プロビジョニングされたスループットをオンデマンドモードに変更する D) Kinesis Enhanced Fan-outを有効化する

正解: B 解説: ホットパーティションは特定シャードへの集中書き込みが原因。解決策: ①パーティションキーにランダムサフィックス(例: user_id + “_” + str(random.randint(0,99)))を付加。②または定数パーティションキーを避けUUID/ランダム値使用。シャード追加は全体スループット向上だが偏りは解消しない。


問題17 AWSデータパイプラインでべき等性(Idempotency)を確保する最も重要な理由は?

A) コスト削減のため B) 同じパイプラインを複数回実行しても同じ結果になり、失敗時の再実行が安全に行える C) データの暗号化を自動化するため D) パイプラインの実行速度を向上させるため

正解: B 解説: べき等性のないパイプラインを再実行すると重複データが発生する。べき等性確保の手法: ①UPSERT(MERGE)でのデータ書き込み。②一意キーによる重複排除。③Icebergのアトミックコミット。④S3の冪等PUT。Step Functions + Glueのパイプラインでは失敗リトライが多いため必須。


問題18 Amazon EMR Serverlessと従来のEMRクラスターの主な違いは?

A) EMR Serverlessは常時稼働クラスターが必要 B) EMR ServerlessはApache Sparkのみサポートし、HiveやPrestoは不可 C) EMR Serverlessはクラスター管理不要でジョブ実行時のみリソースをプロビジョニングし、アイドル時はコスト0 D) EMR Serverlessはオンプレミスデータソースに接続できない

正解: C 解説: EMR Serverlessはサーバーレス型のEMR。クラスター作成・管理・スケーリング不要。ジョブ実行時のみvCPU・メモリに対して課金。アイドル時はコスト0。Spark・Hive・Presto対応。一方で、従来EMRより起動が数秒遅く(コールドスタート)、長期常時稼働ジョブはクラスターの方がコスト効率が良い場合も。


問題19 AWS Glue ETLジョブでメモリ不足(OOM)が発生した場合の対処法として最も効果的なのは?

A) Glue ETLジョブを複数の小さいジョブに分割する B) Worker TypeをG.1X からG.2X に変更し、DPU数を増やす C) Glue ジョブをLambda関数に変換する D) データをDynamoDBに一時保存して分割処理する

正解: B 解説: Glue G.2X workerは1 DPU = 8 vCPU + 32GB RAM(G.1Xは4 vCPU + 16GB RAM)。大規模データ処理やメモリ集中型変換にはG.2X + DPU増加が有効。また--enable-glue-datacatalog--TempDirの適切な設定も重要。極端に大きいデータセットは処理をバッチ分割も検討。


問題20 Amazon Athena Federated Queryを使用するユースケースとして最も適切なのは?

A) Athenaクエリの実行速度を向上させるため B) オンプレミスやRDS、DynamoDB等の非S3データソースをSQLで統合クエリするため C) AthenaをRedshiftの代替として使用するため D) Athenaの実行コストを削減するため

正解: B 解説: Athena Federated Queryはデータソースコネクター(Lambda関数)を通じてRDS、DynamoDB、CloudWatch Logs、オンプレミスDB等の非S3データをAthena SQLでクエリ可能にする機能。S3データレイクと他データソースのJOINクエリが実現できる。


DEA試験 最終チェックリスト

ドメイン1: データ取り込みと変換 (34%)

  • [ ] Kinesis Data Streams シャード計算(入力レート÷1MB/s)
  • [ ] Kinesis Data Firehose 変換・パーティショニング
  • [ ] AWS DMS CDC設定(binlog_format=ROW)
  • [ ] AWS Glue ETL(DynamicFrame, Resolve Choice, ApplyMapping)
  • [ ] AWS Glue DataBrew(レシピ、PII検出、プロファイリング)
  • [ ] AWS Glue Data Quality(DQDL構文、評価実行)
  • [ ] Step Functions(StateMachine定義、Retry/Catch)
  • [ ] EventBridge(ルール、ターゲット、スケジュール)

ドメイン2: データストア管理 (26%)

  • [ ] S3(ストレージクラス、ライフサイクル、レプリケーション)
  • [ ] Apache Iceberg(スキーマ進化、タイムトラベル、コンパクション)
  • [ ] Apache Hudi(CoW vs MoR、upsert、Timeline)
  • [ ] Delta Lake(ACID、Time Travel、OPTIMIZE)
  • [ ] Redshift(DISTKEY, SORTKEY, WLM, Spectrum)
  • [ ] DynamoDB(パーティションキー設計、GSI、Streams)
  • [ ] Lake Formation(TBAC、列セキュリティ、行フィルター、LF-Tags)

ドメイン3: データ操作とサポート (22%)

  • [ ] EMR(Spark最適化、クラスター設計、EMR Serverless)
  • [ ] Athena(パーティション、CTAS、Federated Query)
  • [ ] OpenSearch(シャード設計、インジェスト最適化)
  • [ ] AWS Macie(PII検出、分類ジョブ)
  • [ ] CloudWatch(メトリクス、ログ、アラーム)

ドメイン4: データセキュリティとガバナンス (18%)

  • [ ] KMS(CMK、データキー、エンベロープ暗号化)
  • [ ] S3 バケットポリシー、ACL、Block Public Access
  • [ ] VPC エンドポイント(Interface、Gateway)
  • [ ] Glue Data Catalog 暗号化
  • [ ] CloudTrail データイベント記録

試験当日の注意点

  1. Apache Iceberg/Hudi/Delta Lakeの違いを明確に
  2. Glue DPU計算とコスト最適化
  3. Kinesis vs MSK の使い分け(既存Kafka = MSK、AWSネイティブ = Kinesis)
  4. Athena費用 = スキャンデータ量で決まる(Parquet+圧縮で削減)
  5. Step Functions 状態定義のJSON構文