目次

AWS Certified Data Analytics – Specialty (DAS-C01) 完全学習ガイド

試験概要

項目 内容
試験コード DAS-C01
正式名称 AWS Certified Data Analytics – Specialty
難易度 ★★★★☆ (Specialty)
問題数 65問
試験時間 180分
合格スコア 750/1000
有効期限 3年
受験料 $300 USD
推奨経験 データ分析の実務経験5年、AWSデータサービス経験2年以上

ドメイン別出題割合

ドメイン 出題割合 重要度
Domain 1: データの収集 18% ★★★★★
Domain 2: ストレージとデータ管理 22% ★★★★★
Domain 3: 処理 24% ★★★★★
Domain 4: 分析と可視化 18% ★★★★
Domain 5: データセキュリティ 18% ★★★★★

Domain 1: データの収集(18%)

1-1. Kinesis Family

Kinesis Data Streams(KDS)の詳細設計:

シャードの設計:
  → 1シャード = 1MB/秒の書き込み + 2MB/秒の読み取り
  → 1シャードのレコード数: 最大1,000件/秒の書き込み
  
  シャード数の計算:
    max(WriteRate / 1,000 records/sec, WriteDataRate / 1 MB/sec) = 必要シャード数
    
    例: 5,000件/秒 × 500バイト = 2.5MB/秒
    → 書き込みレコード: 5,000 / 1,000 = 5シャード
    → 書き込みデータ: 2.5 / 1 = 2.5 = 3シャード
    → max(5, 3) = 5シャード

パーティションキーの設計:
  → カーディナリティが高いキー = シャード均等分散
  → ホットシャード防止: ランダムサフィックス付加
    customerId + "-" + random(1,100)

データ保持期間:
  → デフォルト: 24時間
  → 最大: 365日(拡張保持期間)
  → コスト: シャード時間 + データ量

Enhanced Fan-Out:
  → 通常: 2MB/秒をコンシューマー間で共有
  → EFO: コンシューマーごとに2MB/秒を確保(専用スループット)
  → 追加コスト: コンシューマー時間 + データ取得量
  → 用途: 複数のコンシューマーが並行処理

Kinesis Data Firehose(KDF)の詳細:

送信先(試験頻出):
  → S3: 最も一般的(パーティション付きで保存)
  → Redshift: S3経由でCOPYコマンドを自動実行
  → OpenSearch Service: リアルタイム検索インデックス
  → HTTP Endpoint: カスタムHTTPエンドポイント
  → Splunk / Datadog / New Relic等(SaaS)

バッファリング設定:
  → サイズ: 1MB〜128MB
  → 時間: 60秒〜900秒
  → 先に条件を満たした方でフラッシュ

データ変換(Lambda統合):
  KDF → Lambda(変換)→ 変換後データ → S3
  → JSONをParquetに変換
  → フィルタリング・フォーマット変更

S3送信時のパーティション設定:
  prefix: "!{partitionKeyFromQuery:date}/data/"
  errorOutputPrefix: "errors/!{firehose:error-output-type}/"
  
  動的パーティション:
  → JSON の特定フィールドでパーティション分割
  → Glue Schema Registry と連携

1-2. AWS DMS(Database Migration Service)

継続的なデータ取り込み(CDC):

DMS CDC のアーキテクチャ:

オンプレSQL Server
    │ (DMS Replication Instance)
    │ CDC(変更データキャプチャ)
    ↓
Kinesis Data Streams / S3
    ↓
Lambda / Glue ETL
    ↓
Redshift / DynamoDB / OpenSearch

CDC設定(Oracle):
  LogminerReaderMode: True(LogMinerを使用)
  SupplementalLoggingLevel: ALL(全カラムの変更を取得)

CDC設定(MySQL):
  binlogFormat: ROW(行ベースのbinlog必須)
  binlogRowImage: FULL(全カラムを記録)

Domain 2: ストレージとデータ管理(22%)

2-1. S3 データレイク設計

ゾーン設計(メダリオンアーキテクチャ):

Raw Zone(ブロンズ):
  → 生データをそのまま保存
  → 変更不可(WORM)
  → 全データを保持(後で再処理可能)
  → パス例: s3://my-datalake/raw/source-system=crm/year=2024/month=01/day=15/

Processed Zone(シルバー):
  → クレンジング・標準化されたデータ
  → Parquet形式、スキーマ適用済み
  → パス例: s3://my-datalake/processed/domain=orders/year=2024/month=01/

Curated Zone(ゴールド):
  → BI/ML用に集計・変換されたデータ
  → ビジネスドメイン別に整理
  → パス例: s3://my-datalake/curated/mart=sales/snapshot_date=2024-01-15/

S3 Intelligent-Tiering のコスト最適化:

S3 Intelligent-Tiering の階層:
  Frequent Access(標準): 30日連続未アクセス → 自動移行
  Infrequent Access: 90日連続未アクセス → 自動移行
  Archive Instant Access: 180日連続未アクセス → 自動移行
  Archive Access: 180日+(手動有効化、取得1-5時間)
  Deep Archive Access: 180日+(手動有効化、取得12時間)

適用基準:
  → アクセスパターンが予測できないオブジェクトに最適
  → 128KB以上のオブジェクット(それ未満はモニタリング料金が逆に高い)
  → 追加コスト: $0.0025/1,000オブジェクット/月(モニタリング)

2-2. AWS Glue の詳細

Glue データカタログ:

データカタログの構成:
  Database
  └── Table
      ├── Schema(カラム名・型)
      ├── パーティション情報(year/month/day等)
      ├── ストレージ情報(S3パス、ファイル形式)
      └── カスタムプロパティ

クローラーの設定:
  データソース: S3, RDS, DynamoDB, JDBC
  スケジュール: 時間/日次 または オンデマンド
  スキーマ変更への対応:
    → 新しいカラムを追加: 自動追加
    → カラム削除: カタログから削除(またはマーク)
    → 互換性のないスキーマ変更: エラーテーブルに記録

Lake Formationとの統合:
  → データカタログのテーブルにアクセス制御を追加
  → 列レベル・行フィルターレベルのセキュリティ

Glue ETL ジョブの最適化:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as F

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_INPUT_PATH', 'S3_OUTPUT_PATH'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3からDynamic Frameを読み込み
datasource = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [args['S3_INPUT_PATH']],
        "recurse": True,
        "groupFiles": "inPartition",  # 小さいファイルをグループ化して処理効率向上
        "groupSize": "104857600"  # 100MB単位でグループ化
    },
    format="json",
    transformation_ctx="datasource"
)

# データ変換
transformed = datasource.toDF() \
    .filter(F.col('status').isNotNull()) \
    .withColumn('processed_at', F.current_timestamp()) \
    .withColumn('year', F.year('event_time')) \
    .withColumn('month', F.month('event_time')) \
    .repartition(10)  # 最適なパーティション数に調整

# Glue DynamicFrameに戻す
output_frame = DynamicFrame.fromDF(transformed, glueContext, 'output')

# Parquet形式でS3に書き込み(パーティション付き)
glueContext.write_dynamic_frame.from_options(
    frame=output_frame,
    connection_type="s3",
    connection_options={
        "path": args['S3_OUTPUT_PATH'],
        "partitionKeys": ["year", "month"]
    },
    format="glueparquet",
    format_options={"compression": "snappy"},
    transformation_ctx="datasink"
)

job.commit()

Domain 3: 処理(24%)

3-1. Amazon EMR

EMR クラスターの設計:

ノードタイプ:
  Master Node: YARN ResourceManager, HDFS NameNode
    → m5.xlarge(最低限)、HA構成では3台
  
  Core Node: YARN NodeManager, HDFS DataNode
    → データの永続性(HDFS)
    → 削除するとデータ損失の可能性
    → スポット使用は慎重に
  
  Task Node: YARN NodeManager のみ(データなし)
    → スポットインスタンスに最適
    → 突然停止してもデータ損失なし
    → スケールアウト/インが容易

ストレージオプション:
  HDFS: クラスター内のローカルストレージ
    → 高速(メモリに近い)
    → クラスター終了でデータ消失
    
  EMRFS (S3): クラスター独立ストレージ
    → クラスター削除後もデータ保持
    → 推奨: 永続データはS3に保管

コスト最適化:
  → Master: オンデマンド(クラスター安定性)
  → Core: オンデマンド or RI(データ保護)
  → Task: スポット(最大90%削減)
  → Auto Scaling: タスクノードを自動増減

一時的クラスター vs 永続クラスター:
  一時的(推奨): ジョブごとにクラスターを作成・削除
    → コスト最小化
    → データはS3に保存
  
  永続: 常時起動
    → 起動レイテンシなし
    → コストは高い

3-2. Amazon Redshift

Redshift の分散スタイルと最適化:

分散スタイル(DISTSTYLE):
  AUTO: Redshiftが自動選択(推奨開始点)
  
  EVEN: 全ノードに均等分散
    → JOINなし、集計のみのテーブル
    → 小〜中サイズのテーブル
  
  KEY: 指定したカラムの値でハッシュ分散
    → 大きなファクトテーブル
    → JOINキーを分散キーに設定 → コロケーション結合
    例: ORDER_DATE, CUSTOMER_ID 等
  
  ALL: 全ノードにコピー
    → 小さなディメンションテーブル
    → 全ノードでローカル結合可能

ソートキー(SORTKEY):
  COMPOUND: 複数カラムの複合ソートキー
    → WHERE句の全カラムがある場合に効果的
    例: (year, month, day) の順でソート
  
  INTERLEAVED: 全カラムに均等な重みを付けたソート
    → 複数カラムでフィルタリングするパターン
    → VACUUMコストが高い
  
  AUTO: Redshiftが最適を選択

Redshift Spectrum:
  → S3上のデータをRedshiftから直接クエリ
  → クラスター外でスキャン(クラスターへの転送最小化)
  → Glue Data Catalogを使用
  → コスト: $5/TB スキャン

Redshift Serverless:
  → キャパシティプロビジョニング不要
  → RPU(Redshift Processing Unit)ベースの課金
  → アイドル時はコスト発生なし
  → ad-hocクエリや変動ワークロードに最適

Domain 4: 分析と可視化(18%)

4-1. Athena の最適化

クエリパフォーマンスの最適化:

-- アンチパターン: SELECT * とフィルタなし
SELECT * FROM raw_data;

-- 最適化: 必要なカラムのみ、パーティション指定
SELECT 
    order_id, 
    customer_id, 
    total_amount
FROM processed_orders
WHERE year = 2024 
  AND month = 1  -- パーティションプルーニング
  AND status = 'COMPLETED'
LIMIT 10000;  -- 必要に応じてLIMIT

-- さらに最適化: パーティションプロジェクション(Glueクローラー不要)
-- テーブルプロパティで動的パーティションを定義

Parquet形式による最適化:

Parquetの利点:
  カラムナーフォーマット:
    → 必要なカラムのみ読み取り(I/O削減)
    → 高い圧縮率(Snappy, ZSTD等)
    → Athenaでのコスト削減(スキャンデータ量課金)

  実際の比較:
  JSON形式: 1TBのデータ → $5スキャンコスト
  Parquet+圧縮: 同データが約100GB → $0.5スキャンコスト(90%削減)

変換パイプライン:
  S3(JSON) → Glue ETL → S3(Parquet) → Athena

Athena Federation:

Athena Data Source Connector:
  → Lambda関数を使ってRedshift/RDS/DynamoDB等に接続
  → S3以外のデータソースをAthenaでクエリ
  → フェデレーテッドクエリ: 複数ソースをJOIN

-- DynamoDBとS3データをJOINする例
SELECT s.order_id, d.customer_name, s.total_amount
FROM s3_orders s
JOIN dynamodb_customers d ON s.customer_id = d.id
WHERE s.year = 2024;

4-2. Amazon QuickSight

QuickSightのデータセット設計:

SPICE(Super-fast Parallel In-memory Calculation Engine):
  → データをメモリにキャッシュ(最大500GB/10GBユーザーあたり)
  → 高速なインタラクティブ分析
  → スケジュール更新(毎日1回等)
  → コスト: $0.25/GBユーザー/月

Direct Query:
  → リアルタイムデータに接続
  → SPICEより遅い(データソース依存)
  → 常に最新データ
  
Row-level Security (RLS):
  → ユーザー/グループごとに見えるデータを制限
  → RLS データセット(ユーザーとフィルタ条件のマッピング)
  例: 営業Aさんは自分の担当顧客データのみ表示

ML Insights:
  → 自動的に異常を検知(Anomaly Detection)
  → 予測(Forecasting)
  → 要因分析(Key Drivers Analysis)

Domain 5: データセキュリティ(18%)

5-1. Lake Formation のアクセス制御

列レベルセキュリティの実装:

import boto3

lf = boto3.client('lakeformation')

# テーブルの特定カラムへのアクセス権限を付与
lf.grant_permissions(
    Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789:role/analyst-role'},
    Resource={
        'TableWithColumns': {
            'DatabaseName': 'sales_db',
            'Name': 'customers',
            'ColumnNames': ['customer_id', 'name', 'region'],  # 許可するカラム
            # または ColumnWildcard: {'ExcludedColumnNames': ['ssn', 'credit_card']}
        }
    },
    Permissions=['SELECT'],
    PermissionsWithGrantOption=[]
)

# 行フィルターの設定
lf.create_data_cells_filter(
    TableData={
        'TableCatalogId': '123456789',
        'DatabaseName': 'sales_db',
        'TableName': 'orders',
        'Name': 'japan_region_filter',
        'RowFilter': {
            'FilterExpression': "region = 'JP'",  # 日本のデータのみ
        },
        'ColumnNames': ['order_id', 'customer_id', 'amount']
    }
)

学習計画(10週間)

Week 1-3: データ収集・保管

Week 1: Kinesis Family(KDS/KDF/KDA)
  → シャード設計の計算練習
  → パーティションキー設計のベストプラクティス

Week 2: S3データレイク設計
  → メダリオンアーキテクチャ
  → パーティション設計とParquet形式

Week 3: Glue(クローラー、ETLジョブ、データカタログ)

Week 4-6: 処理・分析

  • Week 4: EMR(Spark、コスト最適化)
  • Week 5: Redshift(分散・ソートキー、Spectrum、Serverless)
  • Week 6: Athena(最適化、フェデレーション)+ QuickSight

Week 7-8: セキュリティとエンドツーエンド設計

  • Week 7: Lake Formation、KMS暗号化、VPCエンドポイント
  • Week 8: エンドツーエンドのパイプライン設計演習

Week 9-10: 仕上げ

  • Week 9: 模擬試験2回
  • Week 10: 弱点強化と試験

頻出問題パターン

パターン1: 「リアルタイム vs バッチ」の選択

リアルタイム要件(秒〜分単位):
  → Kinesis Data Streams + Lambda / Flink
  → OpenSearchへのインデックス更新

ニアリアルタイム(分〜時間単位):
  → Kinesis Firehose → S3 → Athena

バッチ(日次/週次):
  → S3 → Glue → Redshift / S3(Parquet)
  → EMR(大規模データ)

パターン2: 「コスト最適化」

S3のコスト削減:
  → Parquet + Snappy圧縮(Athenaスキャンコスト90%削減)
  → S3 Intelligent-Tiering(アクセスパターン不明)
  → パーティション設計(不要データのスキャン排除)

Redshiftのコスト削減:
  → Serverless(アイドル時ゼロ)
  → RA3ノード(ストレージとコンピュートを分離)
  → SpectrumでコールドデータはそのままS3に

アーキテクチャ図

データレイク&分析 全体アーキテクチャ

flowchart TB
    subgraph Ingest["データ取り込み層"]
        KDS["Kinesis\nData Streams\n(リアルタイム)"]
        KDF["Kinesis\nFirehose\n(S3配信)"]
        DMS["AWS DMS\n(DB CDC)"]
        Batch["S3 PUT\n(バッチ)"]
    end

    subgraph Storage["S3データレイク"]
        Raw["Raw Layer\ns3://lake/raw/\n(元データそのまま)"]
        Proc["Processed Layer\ns3://lake/processed/\n(Parquet変換済み)"]
        Curated["Curated Layer\ns3://lake/curated/\n(分析用集計済み)"]
    end

    subgraph Catalog["データカタログ"]
        GlueCat["Glue Data Catalog\n(テーブル定義)"]
        LakeForm["Lake Formation\n(アクセス制御)"]
    end

    subgraph Process["処理層"]
        GlueETL["Glue ETL\n(PySpark)"]
        EMR["EMR\n(大規模Spark)"]
        Athena["Athena\n(S3 SQL)"]
        Redshift["Redshift\n(DWH)"]
        RedSpec["Redshift Spectrum\n(S3直接クエリ)"]
    end

    subgraph Viz["可視化層"]
        QS["QuickSight\n(BIダッシュボード)"]
    end

    KDS --> KDF --> Raw
    DMS --> Raw
    Batch --> Raw
    Raw --> GlueETL --> Proc --> GlueETL --> Curated
    GlueCat <--> Athena & Redshift & GlueETL
    Curated --> Redshift & Athena
    Raw --> RedSpec
    Redshift & Athena & QS --> QS
    LakeForm --> GlueCat

Kinesis シャード設計

flowchart LR
    subgraph Producers["データ生成"]
        IoT["IoT センサー\n1,000台\n1KB/秒/台"]
        App["アプリ\nログ"]
    end

    subgraph KDS["Kinesis Data Streams"]
        S1["Shard 1\n1MB/s Write\n2MB/s Read\n最大1000 RPS"]
        S2["Shard 2\n1MB/s Write\n2MB/s Read"]
        SN["Shard N\n..."]
    end

    subgraph Consumer["コンシューマー"]
        Lambda["Lambda\n(Shared Fan-Out\n最大2MB/s共有)"]
        KDA["Kinesis\nData Analytics"]
        EFO["Enhanced Fan-Out\n(2MB/s/コンシューマー専用)"]
    end

    IoT & App --> S1 & S2 & SN
    S1 & S2 & SN --> Lambda & KDA
    S1 & S2 & SN --> EFO

    Note["シャード数計算:\n書き込み: (1,000台×1KB)/1MB = 1シャード\n読み取り: コンシューマー数×速度 / 2MB"]

本試験形式 模擬問題(詳細解説付き)


問題 1

IoT センサーから毎秒 5,000 件のイベント(各 2KB)が生成されます。これをリアルタイムで処理し、異常値をリアルタイムアラートするシステムを設計してください。必要な Kinesis Data Streams のシャード数は最低いくつですか?

  • A. 5 シャード
  • B. 10 シャード
  • C. 20 シャード
  • D. 50 シャード
正解と解説

正解: B

解説: シャード数の計算(試験頻出):

書き込み側の計算:

  • 総書き込み量 = 5,000件/秒 × 2KB = 10MB/秒
  • 1シャードの書き込み上限 = 1MB/秒 または 1,000レコード/秒
  • 必要シャード数(スループット) = 10MB / 1MB = 10シャード
  • 必要シャード数(RPS) = 5,000件 / 1,000件 = 5シャード
  • スループットが律速 → 10シャードが必要

読み取り側の計算(確認):

  • 1シャードの読み取り上限 = 2MB/秒(Shared)
  • 10シャードで 20MB/秒読み取り可能(十分)

正解は10シャード(B)

# シャード数計算式
incoming_rate_per_second = 5000  # レコード/秒
avg_record_size_kb = 2           # KB/レコード

shards_by_throughput = (incoming_rate_per_second * avg_record_size_kb / 1024) / 1  # 1MB/s per shard
shards_by_records = incoming_rate_per_second / 1000  # 1000 records/s per shard

required_shards = max(shards_by_throughput, shards_by_records)
print(f"必要シャード数: {required_shards}")  # 10.0

問題 2

Redshift クラスターのクエリパフォーマンスが低下しています。分析すると、大きなテーブルのフルスキャンが頻繁に行われています。このテーブルは日付カラムでのフィルタリングが多く、user_id での結合処理も多いです。最適な改善策はどれですか?

  • A. テーブルを DISTSTYLE ALL に変更する
  • B. SORTKEY に日付カラムを設定し、DISTKEY に user_id を設定する
  • C. テーブルを複数の小さなテーブルに分割する
  • D. Redshift Serverless に移行する
正解と解説

正解: B

解説:

  • B が正解:
    • SORTKEY(日付): データをディスク上で日付順に物理的に並び替えることで、WHERE date >= '2026-01-01' のような範囲クエリで不要なブロックのスキャンをスキップ(Zone Map最適化)
    • DISTKEY(user_id): 結合が多いカラムをDISTKEYにすることで、同じ user_id のデータが同じノードに格納され、ネットワーク転送(Redistribute)を最小化
CREATE TABLE user_events (
    event_date DATE,
    user_id    VARCHAR(36),
    event_type VARCHAR(50),
    revenue    DECIMAL(10,2)
)
DISTKEY(user_id)
SORTKEY(event_date);
  • A(DISTSTYLE ALL): 小さなディメンションテーブル向け(全スライスにコピー)。大きなファクトテーブルには不適切で、データ更新コストが高い。
  • C(テーブル分割): 特定ケースでは有効ですが、根本的な配布・ソート設計の問題を解決しません。
  • D(Serverless): 構成は変わりますが、DISTKEY/SORTKEYの設計問題は残ります。

問題 3

データサイエンチストが S3 にある 1TB のログファイル(JSON形式、パーティションなし)に対して Athena でアドホッククエリを実行しています。クエリコストと速度を改善するための最適な方法はどれですか?

  • A. Athena のワークグループでクエリのスキャン量を制限する
  • B. JSON ファイルを Parquet 形式に変換し、日付でパーティションを設定する
  • C. データを Redshift にロードしてクエリを実行する
  • D. S3 Select を使ってオブジェクトレベルのフィルタリングを行う
正解と解説

正解: B

解説:

  • B が正解: Athena の最適化の基本は「スキャンデータ量の削減」です。Athena の課金はスキャンデータ量($5/TB)ベースです。

改善効果の組み合わせ:

  1. Parquet変換(列指向フォーマット): 必要な列だけ読み込み。JSON比で通常5-10倍のデータ削減
  2. パーティション設定(日付): WHERE句のパーティションフィルタで不要なファイルをスキャンしない。1TBが特定日のデータのみ(例: 10GB)になる可能性あり
-- パーティション設定後のクエリ(パーティションプルーニングが働く)
SELECT user_id, COUNT(*) 
FROM logs
WHERE year=2026 AND month=1 AND day=15  -- パーティションフィルタ
  AND action = 'PURCHASE';
-- 1TBのうち該当日のみスキャン(例: 3GB)→ コスト98%削減
  • A(スキャン量制限): コスト管理には有効ですが、パフォーマンス改善にはなりません。
  • C(Redshift): アドホッククエリの柔軟性が下がり、データロードのコスト・時間が発生します。
  • D(S3 Select): オブジェクト内フィルタリングには有効ですが、複数ファイルにまたがる分析には限定的です。

問題 4

Glue ETL ジョブが大量のデータを処理する際にパフォーマンスが低下しています。調査すると、小さなファイルが S3 に大量(数百万件)に存在することがわかりました。解決策として最も適切なものはどれですか?

  • A. Glue DPU(Data Processing Unit)を増やす
  • B. groupFilesgroupSize オプションを使って小さなファイルを論理的にグループ化する
  • C. 小さなファイルをすべて手動でマージしてから再実行する
  • D. EMR Spark に移行する
正解と解説

正解: B

解説:

  • B が正解: S3の「小さなファイル問題」(Small Files Problem)に対する Glue の組み込みソリューションです。Spark は多数の小ファイルを個別タスクとして処理するため、オーバーヘッドが非常に大きくなります。
# Glue ETL での groupFiles 設定
datasource = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": ["s3://my-bucket/data/"],
        "groupFiles": "inPartition",     # 同一パーティション内でグループ化
        "groupSize": "104857600"         # 100MB単位でグループ化
    },
    format="json"
)
  • A(DPU増加): コストは増えますが、小ファイル問題の根本解決にはなりません。並列度を上げても各タスクのオーバーヘッドは変わりません。
  • C(手動マージ): 解決策にはなりますが、継続的に小ファイルが生成される場合には毎回対処が必要で非効率です。
  • D(EMR移行): EMR でも同様の問題が発生します。EMR では s3-dist-cp でマージするか、設定で対応します。

問題 5

ビジネスアナリストが QuickSight でのダッシュボードの表示速度が遅いと報告しています。データソースは Redshift(数億行のテーブル)です。改善方法として最も適切なものはどれですか?

  • A. Redshift クラスターのサイズを拡張する
  • B. QuickSight で SPICE(超高速インメモリ計算エンジン)を有効化してデータをインポートする
  • C. QuickSight のデータセットでビューを作成する
  • D. QuickSight のビジュアライゼーション数を減らす
正解と解説

正解: B

解説:

  • B(SPICE)が正解: SPICE(Super-fast Parallel In-memory Calculation Engine)は QuickSight の高速インメモリデータストアです。Redshift からデータを事前にインポートしておくことで、ダッシュボード表示時に Redshift へのクエリが発生しません。

SPICE の特徴:

  • 列指向のインメモリフォーマットでデータを格納

  • クエリレスポンスがミリ秒単位(Redshift への直接クエリは数秒〜数分)

  • 定期的な自動更新スケジュール設定可能

  • ユーザー毎に 10GB の SPICE 容量(追加購入可能)

  • A(Redshift拡張): コストがかかり、根本的に QuickSightRedshift の通信は残ります。

  • C(ビュー作成): クエリを最適化できますが、大テーブルのスキャン自体は変わりません。

  • D(ビジュアル削減): 一部改善しますが、根本解決ではありません。


問題 6

データエンジニアリングチームが S3 のデータに対して列レベルのセキュリティを実装したい(特定のユーザーには salary 列を見せない)と考えています。最適な方法はどれですか?

  • A. 各ユーザー向けに salary 列を除いたビューを Glue Data Catalog に作成する
  • B. AWS Lake Formation の列レベルのアクセスコントロールを設定する
  • C. S3 のバケットポリシーで特定ファイルへのアクセスを制限する
  • D. Athena のワークグループで列制限クエリを強制する
正解と解説

正解: B

解説:

  • B(Lake Formation 列レベル制御)が正解: Lake FormationGlue Data Catalog 上のテーブルに対して、列・行レベルのアクセス制御を IAM ユーザー/ロール/グループ単位で設定できます。AthenaGlueRedshift Spectrum 全てに適用されます。
# Lake Formation 列レベルアクセス制御
lakeformation.grant_permissions(
    Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::...:role/analyst-role'},
    Resource={
        'TableWithColumns': {
            'DatabaseName': 'hr_database',
            'Name': 'employees',
            'ColumnWildcard': {
                'ExcludedColumnNames': ['salary', 'ssn']  # これらの列は除外
            }
        }
    },
    Permissions=['SELECT']
)
  • A(ビュー): 管理が複雑でユーザー数が増えると維持困難。Lake Formationの方がシンプルです。
  • C(S3バケットポリシー): ファイル単位の制御のみ。列レベルのフィルタリングはできません。
  • D(Athena ワークグループ): クエリ制限は可能ですが、ユーザーが制限をバイパスする可能性があります。Lake Formation は強制的に適用されます。

DAS-C01 追加詳解セクション

データ分析アーキテクチャの全体像

モダンデータアーキテクチャ(Modern Data Platform):

Raw Layer(生データ)   → Processed Layer(加工済み)→ Curated Layer(分析用)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
S3(ランディングゾーン)→ Glue ETL(S3/Parquet)   → Redshift(DWH)
                                                    → Athena(アドホック)
                                                    → QuickSight(BI)

Amazon Redshift 詳細

Redshift 分散スタイル(Distribution Styles)

テーブルの分散方式:

EVEN(デフォルト):
  → 行をスライスに均等分散
  → テーブルが参照される頻度が少ない・JOINが少ない場合

KEY:
  → 特定列の値に基づいて同じ値を同じスライスに配置
  → JOINやGROUP BYで使う列をキーにすると最適化

ALL:
  → テーブルをすべてのノードにコピー
  → 小さな参照テーブル(ディメンションテーブル)向け
  → JOINのブロードキャストを防ぐ

AUTO:
  → Redshiftが自動的に最適な方式を選択
  → テーブルサイズに基づき EVEN か ALL を選択

選択基準:
  大きなファクトテーブル → KEY(JOIN列で分散)
  小さなディメンションテーブル → ALL
  JOINがほとんどない → EVEN

Redshift ソートキー(Sort Keys)

COMPOUND ソートキー:
  → 複数列を順に指定(ORDER BY に近い)
  → 第1列でのフィルタが最も効果的
  → 例: SORTKEY(year, month, day)

INTERLEAVED ソートキー:
  → 全てのキー列に均等な重みを持たせる
  → 任意の列でのフィルタに対応
  → 大量の DELETE/INSERT で性能低下
  → Vacuum コストが高い

使い分け:
  「クエリが常に特定列でフィルタ」 → COMPOUND
  「様々な列でフィルタ」            → INTERLEAVED(ただしVacuumコストに注意)

Amazon QuickSight 詳細

QuickSightの主要機能:
  → BIダッシュボード・インタラクティブな可視化
  → SPICE(Super-fast, Parallel, In-memory Calculation Engine):
    データをインメモリキャッシュに取り込んで高速クエリ
  → Amazon Q in QuickSight: 自然言語でのデータ分析
  → Embedded Analytics: アプリへのダッシュボード埋め込み

データソース:
  → Athena, Redshift, RDS, S3, Salesforce等
  → 直接クエリ または SPICEへのインポート

ユーザー管理:
  → Enterprise Edition: IAMフェデレーション・AD統合
  → Standard Edition: Cognito/メール認証
  → Row-level security(行レベルセキュリティ)

Amazon OpenSearch Service

主な用途:
  → 全文検索(E-Commerce製品検索・ドキュメント検索)
  → ログ分析(Kibanaダッシュボード)
  → ベクトル検索(RAGのベクトルDB)
  → 半構造化データの分析

Kibana(OpenSearch Dashboards):
  → ログ・時系列データのリアルタイム可視化
  → CloudWatch Logs → Kinesis → OpenSearch のパイプライン

OpenSearch と ElasticSearch の関係:
  → OpenSearch は Elasticsearch 7.10 からのフォーク(AWSが主導)
  → AWS CloudSearch の後継としてより強力

AWS Glue 詳細

Glue パーティション管理

動的パーティション(Dynamic Partitioning):
  → データを書き込む際に自動的にパーティションを作成
  → 例: 年/月/日でデータを自動分類してS3に保存

パーティションプルーニング(Partition Pruning):
  → WHEREでパーティション列をフィルタすることで不要なファイルを読まない
  → 大幅なコスト削減(スキャン量削減)

Glue Partitionの整合性:
  → Crawler実行またはGlue Python API でパーティションをCatalogに追加
  → MSCK REPAIR TABLE(Athena)でパーティション自動更新

DAS-C01 模擬問題


問題 DAS-01

Redshiftで「毎日大量のトランザクションデータが追加されるファクトテーブル」のJOINパフォーマンスを最適化したい。最も適切な分散スタイルはどれですか?

  • A. EVEN
  • B. ALL
  • C. KEY(JOIN列で分散)
  • D. AUTO
正解と解説

正解: C

大きなファクトテーブルのJOINはKEY分散が最適。JOINに使う列(例: customer_id)をキーとして指定することで、同一顧客のデータが同一スライスに集まりネットワーク転送を削減できる。


問題 DAS-02

「Amazon QuickSightの大量データに対するダッシュボードの応答速度を最大化したい」場合の設定はどれですか?

  • A. データソースに直接クエリ(Direct Query)を使用する
  • B. SPICEにデータをインポートしてインメモリで処理する
  • C. Redshift Spectrumを経由する
  • D. CloudFrontでキャッシュする
正解と解説

正解: B

SPICEはQuickSightのインメモリ計算エンジン。データをSPICEにインポートすると、ソースデータへの直接クエリより大幅に高速なレスポンスが得られる。


問題 DAS-03

ログデータをS3に保存してAthenaで分析する場合、コストを最大限削減するためのフォーマット選択はどれですか?

  • A. CSV(カンマ区切り)形式
  • B. JSON形式
  • C. Apache Parquet形式
  • D. XML形式
正解と解説

正解: C

Parquetは列指向の圧縮フォーマット。Athenaは必要な列のみスキャンする(列プルーニング)ため、Parquetは全列を持つCSV/JSONより最大87%のコスト削減が可能。


問題 DAS-04

OpenSearch Serviceの「Kibana(OpenSearch Dashboards)」を使ったユースケースとして最も適切なものはどれですか?

  • A. 機械学習モデルのリアルタイム推論
  • B. アプリケーションログのリアルタイム分析・可視化ダッシュボード
  • C. RDSデータのETL変換
  • D. DynamoDBのバックアップ管理
正解と解説

正解: B

OpenSearch + KibanaはELKスタック(Elasticsearch/Logstash/Kibana)のAWS版。ログデータをリアルタイムに取り込み、インタラクティブなダッシュボードで可視化・分析するユースケースに最適。


問題 DAS-05

Athenaでパーティションプルーニングを活用するためにテーブルを設定したい。S3のデータを正しくパーティショニングするフォルダ構造はどれですか?

  • A. s3://bucket/logs/all_data.parquet
  • B. s3://bucket/logs/year=2024/month=01/day=15/data.parquet
  • C. s3://bucket/2024-01-15-logs.parquet
  • D. s3://bucket/logs/data_20240115.parquet
正解と解説

正解: B

Hive形式のパーティションキー(key=value/key=value)でS3にデータを格納することでGlue CatalogがパーティションをCatalogに登録し、AthenaがWHERE year=2024 AND month=01のようにパーティションプルーニングを実行できる。


DAS-C01 試験直前チェックリスト

  • [ ] Redshift分散スタイル(EVEN/KEY/ALL/AUTO)の使い分け
  • [ ] Redshiftソートキー(COMPOUND vs INTERLEAVED)
  • [ ] Redshift Spectrum の用途(S3への直接クエリ)
  • [ ] QuickSight SPICE のメリット
  • [ ] Athenaのコスト最適化(Parquet/パーティション)
  • [ ] OpenSearch/Kibanaのユースケース(ログ分析)
  • [ ] Glue Crawlerとパーティション管理
  • [ ] Kinesis Data Streams vs Firehose の違い

付録: DAS-C01 頻出サービス一覧

サービス DAS試験での重点
Amazon Redshift 分散スタイル・ソートキー・Spectrum・Serverless
Amazon Athena S3クエリ・Parquet・パーティション・コスト最適化
Amazon QuickSight SPICE・Embedded Analytics・Row Level Security
Amazon OpenSearch 全文検索・ログ分析・ベクトル検索
AWS Glue ETL・Crawler・Data Catalog・DataBrew
Amazon Kinesis Streams・Firehose・Analytics
Amazon MSK(Kafka) ストリーミング・コンシューマーグループ
AWS Lake Formation データレイクセキュリティ・列レベルアクセス
Amazon S3 データレイク・ライフサイクル
AWS EMR Spark/Hadoop・大規模バッチ処理
Amazon DynamoDB NoSQL・DAXキャッシュ

Domain 1: データの収集(詳細補足)

1-2. Kinesis Data Firehose 詳細

Kinesis Data Firehose アーキテクチャ:

Producer → Firehose → [Transform Lambda] → Destination
                            ↓
                    ┌─────────────────────────┐
                    │ Destinations:           │
                    │ - S3                   │
                    │ - Redshift (via S3)    │
                    │ - OpenSearch           │
                    │ - Splunk               │
                    │ - HTTP Endpoint        │
                    │ - MongoDB Atlas        │
                    └─────────────────────────┘

バッファリング設定:
  - Buffer Size: 1MB - 128MB
  - Buffer Interval: 60秒 - 900秒
  どちらかの条件に達した時点でフラッシュ

フォーマット変換:
  - JSON → Parquet/ORC (自動変換)
  - Glue Data Catalogのスキーマを参照
  - 変換エラーはS3の別パスに保存
import boto3
import json
import time

firehose = boto3.client('firehose', region_name='us-east-1')

# Firehoseへのデータ送信
def send_to_firehose(stream_name: str, records: list) -> dict:
    """Firehoseへのバッチ書き込み (最大500件, 4MB)"""
    
    batch_records = []
    for record in records:
        batch_records.append({
            'Data': json.dumps(record, ensure_ascii=False) + '\n'
        })
    
    # 500件ずつに分割
    results = []
    for i in range(0, len(batch_records), 500):
        batch = batch_records[i:i+500]
        
        response = firehose.put_record_batch(
            DeliveryStreamName=stream_name,
            Records=batch
        )
        
        results.append({
            'attempt': i // 500 + 1,
            'failed_count': response['FailedPutCount'],
            'records_sent': len(batch)
        })
        
        # 失敗したレコードの再試行
        if response['FailedPutCount'] > 0:
            failed_records = [
                batch[i]['Data']
                for i, r in enumerate(response['RequestResponses'])
                if r.get('ErrorCode')
            ]
            print(f"Failed records: {len(failed_records)}")
    
    return results

# Lambda変換関数 (Firehoseのデータ変換)
def firehose_transform(event, context):
    """Firehoseデータ変換Lambda"""
    output = []
    
    for record in event['records']:
        # Base64デコード
        import base64
        payload = base64.b64decode(record['data']).decode('utf-8')
        
        try:
            data = json.loads(payload)
            
            # データ変換ロジック
            transformed = {
                'event_id': data.get('id'),
                'event_time': data.get('timestamp'),
                'event_type': data.get('type'),
                'user_id': data.get('userId'),
                'properties': data.get('properties', {}),
                'ingestion_time': int(time.time())
            }
            
            # 変換後データをBase64エンコード
            output_data = json.dumps(transformed, ensure_ascii=False) + '\n'
            output.append({
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(output_data.encode('utf-8')).decode('utf-8')
            })
        
        except Exception as e:
            output.append({
                'recordId': record['recordId'],
                'result': 'ProcessingFailed',
                'data': record['data']
            })
    
    return {'records': output}

1-3. AWS Database Migration Service (DMS)

DMS 移行アーキテクチャ:

Source DB → DMS Replication Instance → Target DB
              ↓
        ┌─────────────────────────────────────────┐
        │ 移行タイプ:                              │
        │ 1. Full Load - 初回完全ロード            │
        │ 2. CDC (Change Data Capture) - 差分同期  │
        │ 3. Full Load + CDC - フルロード後にCDC   │
        └─────────────────────────────────────────┘

サポートソース:
  - Oracle, SQL Server, MySQL, PostgreSQL
  - MongoDB, DocumentDB
  - S3 (ファイルベース)
  - IBM Db2

サポートターゲット:
  - Amazon Aurora, RDS (全エンジン)
  - Amazon Redshift
  - Amazon S3
  - Amazon DynamoDB
  - Amazon OpenSearch
  - Apache Kafka / Amazon MSK
import boto3

dms = boto3.client('database-migration-service', region_name='us-east-1')

# レプリケーションタスクの作成
def create_dms_task(
    task_name: str,
    source_arn: str,
    target_arn: str,
    replication_instance_arn: str,
    migration_type: str = 'full-load-and-cdc'
) -> str:
    
    table_mappings = {
        "rules": [
            {
                "rule-type": "selection",
                "rule-id": "1",
                "rule-name": "include-all",
                "object-locator": {
                    "schema-name": "public",
                    "table-name": "%"
                },
                "rule-action": "include"
            },
            {
                "rule-type": "transformation",
                "rule-id": "2",
                "rule-name": "lowercase-tables",
                "rule-action": "convert-lowercase",
                "rule-target": "table",
                "object-locator": {
                    "schema-name": "%",
                    "table-name": "%"
                }
            }
        ]
    }
    
    response = dms.create_replication_task(
        ReplicationTaskIdentifier=task_name,
        SourceEndpointArn=source_arn,
        TargetEndpointArn=target_arn,
        ReplicationInstanceArn=replication_instance_arn,
        MigrationType=migration_type,
        TableMappings=json.dumps(table_mappings),
        ReplicationTaskSettings=json.dumps({
            "TargetMetadata": {
                "TargetSchema": "",
                "SupportLobs": True,
                "FullLobMode": False,
                "LobChunkSize": 64
            },
            "FullLoadSettings": {
                "TargetTablePrepMode": "DO_NOTHING",
                "CreatePkAfterFullLoad": True,
                "MaxFullLoadSubTasks": 8,
                "TransactionConsistencyTimeout": 600
            },
            "Logging": {
                "EnableLogging": True
            }
        })
    )
    
    return response['ReplicationTask']['ReplicationTaskArn']

1-4. AWS DataSync

DataSync ユースケース:

オンプレミス NFS/SMB/HDFS → DataSync Agent → AWS Storage
                                    ↓
                           ┌──────────────────┐
                           │ ターゲット:        │
                           │ - Amazon S3      │
                           │ - Amazon EFS     │
                           │ - Amazon FSx     │
                           └──────────────────┘

特徴:
- 暗号化転送 (TLS)
- データ整合性チェック (チェックサム)
- スケジュール実行 (cron式)
- ネットワーク帯域制御
- 最大10Gbpsの転送速度
- エンドポイント: パブリック, VPC, FIPS

vs AWS Transfer Family:
  DataSync: 大量一括移行、オンプレからの移行
  Transfer: SFTP/FTP/FTPSプロトコルベースの継続的ファイル転送

Domain 2: ストレージとデータ管理(詳細補足)

2-1. Amazon Redshift 高度な設定

-- 分散スタイルの選択
-- EVEN: デフォルト, 均等分散, JOINが多い場合は非効率
-- KEY: 特定カラムでハッシュ分散, JOINキーが同じテーブル間で有効
-- ALL: 全ノードに複製, 小さなディメンションテーブル
-- AUTO: Redshiftが自動最適化

-- ファクトテーブル (大テーブル)
CREATE TABLE sales (
    sale_id     BIGINT NOT NULL,
    customer_id INTEGER NOT NULL,
    product_id  INTEGER NOT NULL,
    sale_date   DATE NOT NULL,
    amount      DECIMAL(12,2) NOT NULL
)
DISTKEY(customer_id)       -- JOINが多いキーを選択
SORTKEY(sale_date)         -- よく使うWHEREカラム
ENCODE AUTO;               -- 自動圧縮

-- ディメンションテーブル (小テーブル)
CREATE TABLE dim_product (
    product_id   INTEGER NOT NULL,
    product_name VARCHAR(200),
    category     VARCHAR(100),
    price        DECIMAL(10,2)
)
DISTSTYLE ALL              -- 全ノードに複製してJOINを高速化
SORTKEY(product_id);

-- Redshift Spectrum でS3に直接クエリ
CREATE EXTERNAL SCHEMA spectrum_schema
FROM DATA CATALOG
DATABASE 'spectrum_db'
IAM_ROLE 'arn:aws:iam::123456789:role/RedshiftSpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

CREATE EXTERNAL TABLE spectrum_schema.clickstream (
    user_id     BIGINT,
    event_type  VARCHAR(50),
    event_time  TIMESTAMP,
    page_url    VARCHAR(500)
)
PARTITIONED BY (dt DATE)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://my-data-lake/clickstream/';

-- パーティションの追加
ALTER TABLE spectrum_schema.clickstream
ADD PARTITION (dt='2024-01-01')
LOCATION 's3://my-data-lake/clickstream/dt=2024-01-01/';

-- SpectrumとRedshiftのJOIN
SELECT 
    s.customer_id,
    c.event_type,
    COUNT(*) as event_count,
    SUM(s.amount) as total_sales
FROM sales s
JOIN spectrum_schema.clickstream c ON s.customer_id = c.user_id
WHERE c.dt BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY 1, 2;
import boto3
import psycopg2
import json

# Redshift Serverless の操作
redshift_data = boto3.client('redshift-data', region_name='us-east-1')

def execute_redshift_query(
    sql: str,
    database: str,
    workgroup_name: str = 'default'
) -> dict:
    """Redshift Serverlessへのクエリ実行"""
    
    # クエリ送信
    response = redshift_data.execute_statement(
        WorkgroupName=workgroup_name,
        Database=database,
        Sql=sql
    )
    
    query_id = response['Id']
    
    # 完了待機
    import time
    while True:
        status = redshift_data.describe_statement(Id=query_id)
        
        if status['Status'] == 'FINISHED':
            break
        elif status['Status'] in ['FAILED', 'ABORTED']:
            raise Exception(f"Query failed: {status.get('Error')}")
        
        time.sleep(2)
    
    # 結果取得
    result = redshift_data.get_statement_result(Id=query_id)
    
    # カラム名の取得
    columns = [col['label'] for col in result['ColumnMetadata']]
    
    # 行データの変換
    rows = []
    for record in result['Records']:
        row = {}
        for i, field in enumerate(record):
            value = list(field.values())[0] if field else None
            row[columns[i]] = value
        rows.append(row)
    
    return {
        'columns': columns,
        'rows': rows,
        'total_rows': result['TotalNumRows']
    }

# Redshift データロード
def load_data_to_redshift(
    table_name: str,
    s3_path: str,
    iam_role: str,
    file_format: str = 'PARQUET'
) -> None:
    """S3からRedshiftへのCOPYコマンド"""
    
    copy_sql = f"""
    COPY {table_name}
    FROM '{s3_path}'
    IAM_ROLE '{iam_role}'
    FORMAT AS {file_format}
    MANIFEST
    STATUPDATE ON
    COMPUPDATE ON;
    """
    
    execute_redshift_query(copy_sql, database='analytics')

2-2. AWS Lake Formation

Lake Formation アーキテクチャ:

Data Sources → S3 (Data Lake) ← Lake Formation (権限管理)
                    ↓
            ┌───────────────────────────────┐
            │ Glue Data Catalog             │
            │ (メタデータ・スキーマ)         │
            └───────────────────────────────┘
                    ↓
     ┌──────────────────────────────────────┐
     │ アクセス制御レイヤー                   │
     │ - Database レベル                   │
     │ - Table レベル                      │
     │ - Column レベル (列アクセス制御)      │
     │ - Row レベル (行フィルタ)            │
     │ - Cell レベル (最細粒度)             │
     └──────────────────────────────────────┘
                    ↓
     ┌──────────────────────────────────────┐
     │ クエリエンジン                         │
     │ - Athena                            │
     │ - Redshift Spectrum                 │
     │ - EMR                              │
     │ - Glue ETL                         │
     └──────────────────────────────────────┘
import boto3

lf = boto3.client('lakeformation', region_name='us-east-1')

# Lake Formation の権限付与
def grant_table_permissions(
    principal_arn: str,
    database_name: str,
    table_name: str,
    permissions: list = None
) -> None:
    
    if permissions is None:
        permissions = ['SELECT']
    
    lf.grant_permissions(
        Principal={
            'DataLakePrincipalIdentifier': principal_arn
        },
        Resource={
            'Table': {
                'DatabaseName': database_name,
                'Name': table_name
            }
        },
        Permissions=permissions,
        PermissionsWithGrantOption=[]
    )

# 列レベルアクセス制御
def grant_column_permissions(
    principal_arn: str,
    database_name: str,
    table_name: str,
    included_columns: list
) -> None:
    
    lf.grant_permissions(
        Principal={
            'DataLakePrincipalIdentifier': principal_arn
        },
        Resource={
            'TableWithColumns': {
                'DatabaseName': database_name,
                'Name': table_name,
                'ColumnNames': included_columns  # 許可する列のみ指定
            }
        },
        Permissions=['SELECT']
    )

# 行フィルタの設定
def create_row_filter(
    database_name: str,
    table_name: str,
    filter_name: str,
    row_filter_expression: str
) -> None:
    """特定の行のみアクセス可能にするフィルタ"""
    
    lf.create_data_cells_filter(
        TableData={
            'TableCatalogId': boto3.client('sts').get_caller_identity()['Account'],
            'DatabaseName': database_name,
            'TableName': table_name,
            'Name': filter_name,
            'RowFilter': {
                'FilterExpression': row_filter_expression  # 例: "region = 'us-east-1'"
            },
            'ColumnWildcard': {}  # 全列許可
        }
    )

# Data Lake の登録
def register_s3_location(s3_path: str, role_arn: str) -> None:
    lf.register_resource(
        ResourceArn=f'arn:aws:s3:::{s3_path.replace("s3://", "")}',
        UseServiceLinkedRole=False,
        RoleArn=role_arn
    )

Domain 3: 処理(詳細補足)

3-1. AWS Glue 高度な機能

# Glue DynamicFrame の活用

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as F

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'database', 'table'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# データの読み込み
datasource = glueContext.create_dynamic_frame.from_catalog(
    database=args['database'],
    table_name=args['table'],
    transformation_ctx='datasource'
)

# スキーマの確認
datasource.printSchema()
print(f"Record count: {datasource.count()}")

# ネスト構造のフラット化
flattened = Relationalize.apply(
    frame=datasource,
    staging_path='s3://temp-bucket/relationalize/',
    name='root',
    transformation_ctx='flattened'
)

# ResolveChoice: 型の曖昧さを解決
resolved = ResolveChoice.apply(
    frame=datasource,
    choice='make_cols',  # make_cols, cast:type, match_catalog, project:type
    transformation_ctx='resolved'
)

# フィルタリング
def filter_function(dynamic_record):
    return dynamic_record['status'] == 'active'

filtered = Filter.apply(
    frame=resolved,
    f=filter_function,
    transformation_ctx='filtered'
)

# マッピング
mapped = ApplyMapping.apply(
    frame=filtered,
    mappings=[
        ('old_id', 'long', 'id', 'long'),
        ('user_name', 'string', 'username', 'string'),
        ('created_at', 'string', 'created_date', 'date'),
        ('order_total', 'double', 'total_amount', 'decimal(12,2)')
    ],
    transformation_ctx='mapped'
)

# S3への書き込み (Parquet, Hive形式パーティション)
glueContext.write_dynamic_frame.from_options(
    frame=mapped,
    connection_type='s3',
    connection_options={
        'path': 's3://output-bucket/processed/',
        'partitionKeys': ['year', 'month', 'day']
    },
    format='parquet',
    transformation_ctx='output'
)

job.commit()
# Glue DataBrew - ノーコードデータ変換

import boto3

databrew = boto3.client('databrew', region_name='us-east-1')

# データセットの作成
def create_s3_dataset(name: str, s3_path: str) -> dict:
    return databrew.create_dataset(
        Name=name,
        Format='CSV',
        FormatOptions={
            'Csv': {
                'Delimiter': ',',
                'HeaderRow': True
            }
        },
        Input={
            'S3InputDefinition': {
                'Bucket': s3_path.split('/')[2],
                'Key': '/'.join(s3_path.split('/')[3:])
            }
        }
    )

# レシピの作成 (変換ステップの定義)
def create_recipe(name: str) -> dict:
    return databrew.create_recipe(
        Name=name,
        Steps=[
            {
                'Action': {
                    'Operation': 'REMOVE_MISSING',
                    'Parameters': {
                        'columnNames': 'email,phone'
                    }
                }
            },
            {
                'Action': {
                    'Operation': 'LOWERCASE',
                    'Parameters': {
                        'columnName': 'email'
                    }
                }
            },
            {
                'Action': {
                    'Operation': 'TRIM',
                    'Parameters': {
                        'columnName': 'name'
                    }
                }
            }
        ]
    )

# プロファイリングジョブ (データ品質分析)
def create_profile_job(dataset_name: str, output_bucket: str) -> dict:
    return databrew.create_profile_job(
        Name=f'{dataset_name}-profile',
        DatasetName=dataset_name,
        OutputLocation={
            'Bucket': output_bucket,
            'Key': f'profiles/{dataset_name}/'
        },
        RoleArn='arn:aws:iam::123456789:role/GlueDataBrewRole',
        JobSample={
            'Mode': 'CUSTOM_ROWS',
            'Size': 10000  # サンプルサイズ
        },
        Configuration={
            'ProfileColumns': [{'Regex': '.*'}],  # 全列
            'ColumnStatisticsConfigurations': [
                {
                    'Selectors': [{'Regex': '.*'}],
                    'Statistics': {
                        'IncludedStatistics': [
                            'MEAN', 'MEDIAN', 'MODE', 'STANDARD_DEVIATION',
                            'MIN', 'MAX', 'MISSING_COUNT', 'UNIQUE_VALUE_COUNT'
                        ]
                    }
                }
            ]
        }
    )

3-2. Amazon EMR

EMR クラスター設計:

Master Node (1台) → YARN Resource Manager, HDFS NameNode
Core Nodes (複数) → YARN Node Manager, HDFS DataNode (データ永続化)
Task Nodes (可変) → YARN Node Manager のみ (スポット対応)

インスタンスタイプ選択:
  - メモリ集約型 (Spark): r5系 (r5.2xlarge〜r5.24xlarge)
  - 計算集約型 (ML): c5系
  - ストレージ集約型 (HDFS): d2系, i3系
  - GPU (深層学習): p3系, g4dn系

コスト最適化:
  - Core: オンデマンド (データ永続化が必要)
  - Task: スポット (中断しても再実行可能)
  - Auto Scaling でピーク時のみTask Nodeを追加
import boto3

emr = boto3.client('emr', region_name='us-east-1')

# EMR クラスターの作成
def create_emr_cluster(
    cluster_name: str,
    master_instance_type: str = 'm5.xlarge',
    core_instance_type: str = 'r5.2xlarge',
    core_instance_count: int = 2
) -> str:
    
    response = emr.run_job_flow(
        Name=cluster_name,
        ReleaseLabel='emr-7.0.0',
        Applications=[
            {'Name': 'Spark'},
            {'Name': 'Hadoop'},
            {'Name': 'Hive'},
            {'Name': 'Livy'},
            {'Name': 'JupyterEnterpriseGateway'}
        ],
        Instances={
            'MasterInstanceType': master_instance_type,
            'SlaveInstanceType': core_instance_type,
            'InstanceCount': core_instance_count + 1,  # Master + Core
            'Ec2KeyName': 'my-key-pair',
            'Ec2SubnetId': 'subnet-xxxx',
            'EmrManagedMasterSecurityGroup': 'sg-master-xxxx',
            'EmrManagedSlaveSecurityGroup': 'sg-core-xxxx',
            'KeepJobFlowAliveWhenNoSteps': True
        },
        Steps=[],
        BootstrapActions=[
            {
                'Name': 'Install Python packages',
                'ScriptBootstrapAction': {
                    'Path': 's3://my-bucket/scripts/bootstrap.sh',
                    'Args': ['pandas', 'scikit-learn', 'pyarrow']
                }
            }
        ],
        Configurations=[
            {
                'Classification': 'spark-defaults',
                'Properties': {
                    'spark.sql.adaptive.enabled': 'true',
                    'spark.sql.adaptive.coalescePartitions.enabled': 'true',
                    'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
                    'spark.sql.parquet.compression.codec': 'snappy'
                }
            },
            {
                'Classification': 'yarn-site',
                'Properties': {
                    'yarn.nodemanager.vmem-check-enabled': 'false'
                }
            }
        ],
        JobFlowRole='EMR_EC2_DefaultRole',
        ServiceRole='EMR_DefaultRole',
        LogUri='s3://my-bucket/emr-logs/',
        AutoScalingRole='EMR_AutoScaling_DefaultRole',
        Tags=[
            {'Key': 'Environment', 'Value': 'production'},
            {'Key': 'Project', 'Value': 'data-analytics'}
        ]
    )
    
    return response['JobFlowId']

# Spark ジョブのステップ追加
def add_spark_step(cluster_id: str, script_path: str, args: list) -> str:
    response = emr.add_job_flow_steps(
        JobFlowId=cluster_id,
        Steps=[
            {
                'Name': 'Spark ETL Job',
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': [
                        'spark-submit',
                        '--deploy-mode', 'cluster',
                        '--master', 'yarn',
                        '--conf', 'spark.sql.adaptive.enabled=true',
                        '--num-executors', '10',
                        '--executor-memory', '8g',
                        '--executor-cores', '4',
                        script_path
                    ] + args
                }
            }
        ]
    )
    
    return response['StepIds'][0]

# EMR Serverless (クラスター管理不要)
emr_serverless = boto3.client('emr-serverless', region_name='us-east-1')

def run_emr_serverless_job(
    application_id: str,
    execution_role_arn: str,
    script_s3_path: str
) -> str:
    
    response = emr_serverless.start_job_run(
        applicationId=application_id,
        executionRoleArn=execution_role_arn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': script_s3_path,
                'sparkSubmitParameters': (
                    '--conf spark.executor.cores=4 '
                    '--conf spark.executor.memory=8g '
                    '--conf spark.driver.memory=4g '
                    '--conf spark.dynamicAllocation.enabled=true'
                )
            }
        },
        configurationOverrides={
            'monitoringConfiguration': {
                's3MonitoringConfiguration': {
                    'logUri': 's3://my-bucket/emr-serverless-logs/'
                }
            }
        }
    )
    
    return response['jobRunId']

3-3. Amazon MSK (Managed Streaming for Apache Kafka)

MSK アーキテクチャ:

Producer → MSK Broker Cluster → Consumer Group
                ↓
        ┌───────────────────────────────┐
        │ Topic: events                │
        │  Partition 0: [msg1, msg4]   │
        │  Partition 1: [msg2, msg5]   │
        │  Partition 2: [msg3, msg6]   │
        └───────────────────────────────┘
                ↓
     ┌──────────────────────────────────┐
     │ MSK Connect:                    │
     │ - S3 Sink Connector             │
     │ - Redshift Sink Connector       │
     │ - OpenSearch Sink Connector     │
     └──────────────────────────────────┘

MSK Serverless vs プロビジョニング:
  Serverless: キャパシティ自動管理, 予測困難なトラフィック向け
  プロビジョニング: 高スループット, 細かい制御が必要な場合
from kafka import KafkaProducer, KafkaConsumer
import json
import boto3

# MSK SASL/SCRAM 認証でのプロデューサー
def create_msk_producer(bootstrap_servers: list) -> KafkaProducer:
    
    # Secrets Managerから認証情報取得
    secrets = boto3.client('secretsmanager')
    secret = json.loads(
        secrets.get_secret_value(SecretId='msk-credentials')['SecretString']
    )
    
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=secret['username'],
        sasl_plain_password=secret['password'],
        value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8') if k else None,
        acks='all',                # 全レプリカへの書き込みを確認
        retries=3,
        max_in_flight_requests_per_connection=1,  # 順序保証
        compression_type='gzip'
    )
    
    return producer

# コンシューマーグループでの消費
def consume_from_msk(
    bootstrap_servers: list,
    topic: str,
    group_id: str
) -> None:
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        auto_offset_reset='earliest',
        enable_auto_commit=False,  # 手動コミット
        value_deserializer=lambda v: json.loads(v.decode('utf-8')),
        max_poll_records=500
    )
    
    try:
        for message in consumer:
            try:
                # メッセージ処理
                process_message(message.value)
                
                # 処理成功後にコミット
                consumer.commit()
            
            except Exception as e:
                print(f"Error processing message: {e}")
                # Dead Letter Queueに送信
    
    finally:
        consumer.close()

def process_message(data: dict) -> None:
    """メッセージ処理ロジック"""
    print(f"Processing: {data}")

Domain 4: 分析と可視化(詳細補足)

4-1. Amazon Athena 高度な機能

-- Athena CTAS (Create Table As Select)
-- クエリ結果を新テーブルとして保存 (フォーマット変換)
CREATE TABLE optimized_events
WITH (
    format = 'PARQUET',
    parquet_compression = 'SNAPPY',
    partitioned_by = ARRAY['year', 'month'],
    bucketed_by = ARRAY['user_id'],
    bucket_count = 100,
    external_location = 's3://my-bucket/optimized-events/'
)
AS
SELECT
    event_id,
    user_id,
    event_type,
    event_time,
    properties,
    year(event_time) as year,
    month(event_time) as month
FROM raw_events
WHERE event_time >= TIMESTAMP '2024-01-01';

-- ウィンドウ関数
SELECT
    user_id,
    event_type,
    event_time,
    COUNT(*) OVER (
        PARTITION BY user_id 
        ORDER BY event_time 
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_events,
    LAG(event_time, 1) OVER (
        PARTITION BY user_id ORDER BY event_time
    ) as previous_event_time,
    event_time - LAG(event_time, 1) OVER (
        PARTITION BY user_id ORDER BY event_time
    ) as time_since_last_event
FROM events
WHERE user_id = '12345';

-- Athena フェデレーテッドクエリ (外部データソース)
-- Lambda データソースコネクタ経由でRDS/DynamoDB等に直接クエリ
SELECT *
FROM lambda_datasource.rds_mysql.public.users u
JOIN "my-athena-catalog".events e ON u.user_id = e.user_id
WHERE e.event_date >= '2024-01-01';
import boto3
import time

athena = boto3.client('athena', region_name='us-east-1')

def run_athena_query(
    sql: str,
    database: str,
    output_bucket: str,
    workgroup: str = 'primary'
) -> list:
    """Athenaクエリの実行と結果取得"""
    
    # クエリ実行
    response = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={
            'OutputLocation': f's3://{output_bucket}/athena-results/'
        },
        WorkGroup=workgroup
    )
    
    query_id = response['QueryExecutionId']
    
    # 完了待機
    while True:
        status = athena.get_query_execution(QueryExecutionId=query_id)
        state = status['QueryExecution']['Status']['State']
        
        if state == 'SUCCEEDED':
            break
        elif state in ['FAILED', 'CANCELLED']:
            error = status['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')
            raise Exception(f"Query {state}: {error}")
        
        time.sleep(2)
    
    # コスト情報
    statistics = status['QueryExecution']['Statistics']
    data_scanned_mb = statistics.get('DataScannedInBytes', 0) / (1024 * 1024)
    print(f"Data scanned: {data_scanned_mb:.2f} MB (Cost: ${data_scanned_mb/1024 * 5:.4f})")
    
    # 結果取得
    results = athena.get_query_results(QueryExecutionId=query_id)
    
    # ヘッダーと行データの処理
    columns = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
    rows = []
    
    for row in results['ResultSet']['Rows'][1:]:  # ヘッダー行をスキップ
        row_data = {}
        for i, field in enumerate(row['Data']):
            row_data[columns[i]] = field.get('VarCharValue', '')
        rows.append(row_data)
    
    return rows

# Athena Workgroupでのコスト管理
def create_workgroup_with_limits(
    workgroup_name: str,
    output_bucket: str,
    per_query_limit_mb: int = 1024,  # 1GB
    per_workgroup_limit_gb: int = 100  # 100GB/日
) -> None:
    
    athena.create_work_group(
        Name=workgroup_name,
        Configuration={
            'ResultConfiguration': {
                'OutputLocation': f's3://{output_bucket}/workgroup-results/'
            },
            'EnforceWorkGroupConfiguration': True,
            'PublishCloudWatchMetricsEnabled': True,
            'BytesScannedCutoffPerQuery': per_query_limit_mb * 1024 * 1024,
            'RequesterPaysEnabled': False
        },
        Description=f'Workgroup with {per_query_limit_mb}MB per query limit'
    )

4-2. Amazon QuickSight

QuickSight SPICE エンジン:
  SPICE = Super-fast, Parallel, In-memory Calculation Engine
  
  メリット:
  - クエリが高速 (データがメモリ内)
  - データソースへの直接アクセス不要
  - 自動更新スケジュール設定可能
  
  制限:
  - SPICE容量: 10GB/ユーザー (追加購入可能)
  - データ更新: 最小1時間ごと
  - 非SPICEモード: 毎回データソースに直接クエリ

QuickSight セキュリティ:
  - Row Level Security (RLS): ユーザー/グループごとの行制限
  - Column Level Security: 特定列の非表示
  - VPC Connection: プライベートVPCのデータソース接続
  - Embedded Analytics: アプリへのダッシュボード埋め込み
import boto3
import json

quicksight = boto3.client('quicksight', region_name='us-east-1')
account_id = '123456789012'

# Row Level Security の設定
def configure_rls(
    dataset_id: str,
    rls_dataset_arn: str
) -> None:
    """データセットにRLSを設定"""
    
    quicksight.update_data_set(
        AwsAccountId=account_id,
        DataSetId=dataset_id,
        Name='Sales Data',
        PhysicalTableMap={},
        LogicalTableMap={},
        ImportMode='SPICE',
        RowLevelPermissionDataSet={
            'Arn': rls_dataset_arn,
            'PermissionPolicy': 'GRANT_ACCESS',  # または DENY_ACCESS
            'FormatVersion': 'VERSION_2'
        }
    )

# ダッシュボードの埋め込みURL取得
def get_embedded_dashboard_url(
    dashboard_id: str,
    user_arn: str,
    session_lifetime_minutes: int = 600
) -> str:
    
    response = quicksight.get_dashboard_embed_url(
        AwsAccountId=account_id,
        DashboardId=dashboard_id,
        IdentityType='IAM',
        SessionLifetimeInMinutes=session_lifetime_minutes,
        UserArn=user_arn,
        AdditionalDashboardIds=[]
    )
    
    return response['EmbedUrl']

# QuickSight ユーザーの作成
def create_quicksight_user(
    email: str,
    username: str,
    role: str = 'READER'  # ADMIN, AUTHOR, READER
) -> dict:
    
    return quicksight.register_user(
        AwsAccountId=account_id,
        Namespace='default',
        Email=email,
        IdentityType='IAM',
        UserRole=role,
        UserName=username
    )

Domain 5: データセキュリティ(詳細補足)

5-1. データ暗号化

暗号化オプション比較:

┌─────────────────────────────────────────────────────────────┐
│              AWS データ暗号化オプション                       │
├──────────────┬────────────────────────┬────────────────────-┤
│ 方式          │ 管理者                 │ 適用場面            │
├──────────────┼────────────────────────┼────────────────────-┤
│ SSE-S3       │ AWS管理                │ シンプルな要件      │
│ SSE-KMS      │ KMS CMK               │ 監査・アクセス制御  │
│ SSE-C        │ 顧客管理キー           │ 独自キー管理        │
│ CSE          │ クライアントサイド暗号化│ 厳格なコンプライアンス│
└──────────────┴────────────────────────┴────────────────────-┘

Redshift 暗号化:
- クラスター暗号化: KMS または HSM
- 転送中: SSL (JDBC/ODBC)
- 既存クラスターの暗号化: スナップショット → 新クラスター

EMR 暗号化:
- EBS暗号化: EC2インスタンスストア
- S3暗号化: SSE-KMS
- 転送中: TLS (HDFS RPC, S3との通信)
import boto3
import json

kms = boto3.client('kms', region_name='us-east-1')

# データレイク用KMSキーポリシー
def create_data_lake_kms_key(
    key_alias: str,
    admin_role_arn: str,
    analytics_role_arn: str
) -> str:
    
    key_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Enable IAM User Permissions",
                "Effect": "Allow",
                "Principal": {
                    "AWS": f"arn:aws:iam::123456789:root"
                },
                "Action": "kms:*",
                "Resource": "*"
            },
            {
                "Sid": "Allow Key Admin",
                "Effect": "Allow",
                "Principal": {
                    "AWS": admin_role_arn
                },
                "Action": [
                    "kms:Create*", "kms:Describe*", "kms:Enable*",
                    "kms:List*", "kms:Put*", "kms:Update*", "kms:Revoke*",
                    "kms:Disable*", "kms:Get*", "kms:Delete*", "kms:ScheduleKeyDeletion"
                ],
                "Resource": "*"
            },
            {
                "Sid": "Allow Analytics Services",
                "Effect": "Allow",
                "Principal": {
                    "AWS": analytics_role_arn
                },
                "Action": [
                    "kms:Decrypt", "kms:GenerateDataKey",
                    "kms:DescribeKey", "kms:ReEncrypt*"
                ],
                "Resource": "*",
                "Condition": {
                    "StringEquals": {
                        "kms:ViaService": [
                            "s3.us-east-1.amazonaws.com",
                            "athena.us-east-1.amazonaws.com",
                            "glue.us-east-1.amazonaws.com"
                        ]
                    }
                }
            }
        ]
    }
    
    response = kms.create_key(
        Description=f'Data Lake KMS Key - {key_alias}',
        KeyUsage='ENCRYPT_DECRYPT',
        KeySpec='SYMMETRIC_DEFAULT',
        Policy=json.dumps(key_policy),
        Tags=[{'TagKey': 'Purpose', 'TagValue': 'DataLakeEncryption'}]
    )
    
    key_id = response['KeyMetadata']['KeyId']
    
    kms.create_alias(
        AliasName=f'alias/{key_alias}',
        TargetKeyId=key_id
    )
    
    return key_id

5-2. VPC エンドポイントによるプライベートアクセス

import boto3

ec2 = boto3.client('ec2', region_name='us-east-1')

# S3 ゲートウェイエンドポイント (無料)
def create_s3_gateway_endpoint(vpc_id: str, route_table_ids: list) -> str:
    response = ec2.create_vpc_endpoint(
        VpcEndpointType='Gateway',
        VpcId=vpc_id,
        ServiceName='com.amazonaws.us-east-1.s3',
        RouteTableIds=route_table_ids,
        PolicyDocument=json.dumps({
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": "*",
                    "Action": ["s3:GetObject", "s3:PutObject"],
                    "Resource": ["arn:aws:s3:::my-data-lake/*"]
                }
            ]
        })
    )
    return response['VpcEndpoint']['VpcEndpointId']

# Glue インターフェースエンドポイント
def create_glue_interface_endpoint(vpc_id: str, subnet_ids: list, sg_ids: list) -> str:
    response = ec2.create_vpc_endpoint(
        VpcEndpointType='Interface',
        VpcId=vpc_id,
        ServiceName='com.amazonaws.us-east-1.glue',
        SubnetIds=subnet_ids,
        SecurityGroupIds=sg_ids,
        PrivateDnsEnabled=True
    )
    return response['VpcEndpoint']['VpcEndpointId']

模擬試験 第1回 (65問)

Part 1: データ収集・ストリーミング (問1-20)

問1: Kinesis Data Streams のシャード数を計算する際に必要な情報は?

A) 保存期間と暗号化設定 B) 書き込みレート(records/sec)と書き込みデータ量(MB/sec)の両方 C) 消費者アプリケーションの数 D) データの種類とフォーマット

正解: B 解説: シャード数 = max(書き込みレコード数/1,000, 書き込みデータ量/1MB)。例: 5,000件/秒かつ3MB/秒の場合、max(5, 3) = 5シャード必要。


問2: Kinesis Data Firehoseで受信したJSONデータをS3にParquet形式で保存するためには?

A) カスタムLambda関数でParquetに変換してS3に直接書き込む B) FirehoseのRecord Format Conversion機能を有効にし、Glue Data Catalogのスキーマを参照 C) EMRクラスターで後処理 D) Parquet変換はFirehoseでは不可能

正解: B 解説: Firehoseの「Record Format Conversion」機能でJSON→Parquet/ORC変換が可能。Glue Data Catalogで定義されたスキーマに基づいて変換する。


問3: Kinesis Data Streams で「ProvisionedThroughputExceededException」が発生した場合の対処法は?

A) シャードを削除して再作成 B) RetryExponentialBackoffと追加シャードの検討 C) Firehoseに切り替え D) SQSに移行

正解: B 解説: 指数バックオフで再試行するとともに、継続的に発生する場合はシャード数を増やす(SplitShardまたはUpdateShardCount)。適切なパーティションキー設計でシャードの偏りを防ぐことも重要。


問4: Amazon MSK (Kafka) と Kinesis Data Streams の主な違いは?

A) MSKは高コスト、Kinesisは低コスト B) MSKはKafka互換でコンシューマーグループ・トランザクション等の高度な機能を持ち、KinesisはAWSマネージドでシンプル C) Kinesisのみストリーミングをサポート D) 機能は同じでマネージドレベルのみ異なる

正解: B 解説: MSKはKafkaの全機能(コンシューマーグループ、トランザクション、ストリーム処理)を提供し、既存KafkaワークロードのリフトシフトやKafka Connect統合に最適。Kinesisはよりシンプルで完全マネージド。


問5: AWS DMS でリレーショナルDBからAmazon S3にデータを継続的にレプリケートする方法は?

A) Full Loadのみ (CDCは不可能) B) Full Load + CDCモードでS3をターゲットとして設定 C) S3はDMSのターゲットとして非対応 D) S3にレプリケートするにはGlueが必要

正解: B 解説: DMS はS3をターゲットとしてサポートし、Full Load + CDCモードで初回完全ロードの後、変更データを継続的にS3に出力できる。出力フォーマットはCSVまたはParquet。


問6: DataSync と AWS Transfer Family の使い分けは?

A) 機能は同じ B) DataSyncはオンプレミスからの大量データ移行、Transfer FamilyはSFTP/FTPプロトコルベースの継続的ファイル転送 C) Transfer Familyのみセキュア転送をサポート D) DataSyncはS3のみ対応

正解: B 解説: DataSyncはオンプレの大量データの移行・同期に最適化(高速、整合性チェック)。Transfer FamilyはSFTP/FTPS/FTPプロトコルでB2Bファイル交換に使用する。


問7: Kinesis Data Analytics (Managed Service for Apache Flink) の主な用途は?

A) バッチ処理のみ B) ストリーミングデータのリアルタイム集計・分析・異常検知 C) MLモデルのトレーニング D) データの長期アーカイブ

正解: B 解説: Managed Service for Apache Flinkはストリーミングデータのリアルタイム処理に特化。Kinesis Streamsからデータを読み取り、時間窓集計、フィルタリング、異常検知などをリアルタイムで実行する。


問8: S3のデータレイクでファイルサイズを最適化するためのベストプラクティスは?

A) 1ファイル = 1レコード B) 128MB〜1GBのファイルサイズに統合し、小さいファイルをコンパクション C) ファイルサイズは性能に影響しない D) 1ファイル = 1GB超に制限

正解: B 解説: S3/Athenaでは小さいファイルが多いとメタデータオーバーヘッドが大きくなる。128MB〜1GBのファイルサイズが最適で、Glueジョブやコンパクションで小さいファイルを統合する。


問9: Amazon Redshiftのデータロード時に最も高速な方法は?

A) INSERT文で1行ずつ挿入 B) COPY コマンドによるS3からの並列ロード C) DMS経由でのロード D) Glue ETLジョブ経由

正解: B 解説: COPYコマンドは複数のS3ファイルを並列でRedshiftにロードし、単一のINSERTと比較して桁違いに高速。ファイルをスライス数の倍数に分割するとさらに最適化できる。


問10: AWS Glue Crawlerの実行後にデータカタログが更新されない原因として最も可能性が高いものは?

A) S3バケットのリージョンが異なる B) クローラーのIAMロールにs3:GetObjectとs3:ListBucketの権限がない C) データが空 D) Glueのバージョンが古い

正解: B 解説: GlueCrawlerはS3バケットを走査してスキーマを推定するため、s3:GetObjects3:ListBucket権限が必須。権限不足の場合、エラーなくカタログが更新されないことがある。


問11: Amazon Athenaでクエリコストを最小化するための最も効果的な方法は?

A) クエリを小分けにして実行 B) Parquet/ORC形式 + パーティション + 列の選択 (SELECT *)を避ける C) より大きなインスタンスを使用 D) S3バケットを東京リージョンに配置

正解: B 解説: Athenaは5$/TBのスキャン課金。最適化策: (1)Parquet/ORCで圧縮・列指向ストレージ (2)パーティションで不要ファイルをスキップ (3)SELECT *を避け必要な列のみ指定 (4)LIMIT句の使用。


問12: Amazon OpenSearch Serviceでログデータの高速検索を実現するためのインデックス設計は?

A) 全データを1つのインデックスに保存 B) 日次インデックス + ISM (Index State Management) でアーカイブ・削除 C) 全データをホットノードに保存 D) Kibanaのみでインデックス管理

正解: B 解説: 日次インデックス(例: logs-2024-01-01)を作成し、ISMポリシーで古いインデックスをウォームノードに移行→Ultrawarmに移行→削除の自動化が最適。


問13: Redshift SpectrumでAthenaと比較した場合のメリットは?

A) Athenaより安価 B) RedshiftのローカルデータとS3の外部データをシームレスにJOIN可能 C) より多くのファイル形式をサポート D) 認証が不要

正解: B 解説: Redshift SpectrumはRedshiftクラスターからS3のデータを直接クエリし、Redshift内部のテーブルとJOINできる。これはAthenaでは直接できない機能。


問14: AWS Lake FormationのTag-Based Access Control (TBAC) の利点は?

A) S3バケットポリシーのみで管理 B) データにタグを付けてポリシーを一元管理し、複雑なIAMポリシーを簡素化 C) 暗号化を自動化 D) クエリを自動最適化

正解: B 解説: TBACは「confidential=true」のようなタグをデータリソースに付与し、プリンシパルのタグとマッチングでアクセス制御。IAMポリシーを個別に管理する代わりに一元的なタグポリシーで管理できる。


問15: Amazon QuickSight のSPICEキャパシティが不足した場合の対処法は?

A) QuickSightを再インストール B) 追加SPICEキャパシティを購入 ($0.25/GB) またはデータ量を削減 C) DirectQueryモードに強制切り替え D) 制限は増やせない

正解: B 解説: SPICEキャパシティはアカウントレベルで追加購入可能。または不要なデータセットのSPICEキャッシュ削除、フィルタで必要なデータのみインポートしてキャパシティを節約できる。


問16: EMR でSparkジョブのパフォーマンスチューニングで最も重要な設定は?

A) Javaのバージョン B) executorのメモリ・コア数とpartition数の最適化 C) AMIの選択 D) セキュリティグループの設定

正解: B 解説: --executor-memory--executor-coresspark.default.parallelism(partitionの数)がSparkパフォーマンスに最も影響。spark.sql.adaptive.enabled=trueでAQEによる自動最適化も有効。


問17: Glue ETL ジョブで処理するデータ量が増えた場合の対応は?

A) ジョブを複数に分割 B) MaxCapacityまたはNumberOfWorkersを増やしDPU(Data Processing Unit)を追加 C) プロビジョニング不要のためスケールアップ不要 D) S3のデータを圧縮

正解: B 解説: Glueはサーバーレスだが処理能力はDPU(4vCPU+16GBメモリ)で決まる。データ量に応じてMaxCapacity(G.1X: 1DPU, G.2X: 2DPU)を増やすか、WorkerTypeをG.2X/G.4X/G.8Xに変更する。


問18: Amazon Timestream の特徴と適切なユースケースは?

A) 汎用リレーショナルDB B) 時系列データに最適化されたServerlessDB (IoTセンサー、メトリクス) C) ドキュメントDB D) グラフDB

正解: B 解説: Timestreamは時系列データ用のServerlessデータベースで、自動的にホット/コールドストレージに階層化、Time-series関数(interpolate、rate等)を組み込み提供。IoTセンサーやアプリメトリクスに最適。


問19: AWS Glue DataBrewでデータプロファイリングが完了した後に確認できる情報は?

A) クエリ実行計画 B) 欠損値率、ユニーク値数、分布、外れ値の統計情報 C) セキュリティ設定 D) コスト分析

正解: B 解説: DataBrewのプロファイリングは各カラムの統計情報(欠損値率、NULL数、ユニーク率、min/max/平均/中央値、相関関係)を可視化し、データ品質問題を自動検出する。


問20: Redshiftの「Vacuum」コマンドの目的は?

A) ストレージの暗号化 B) 削除マークが付いたデータの物理削除とソートキー順序の再整列 C) バックアップの作成 D) 権限のリセット

正解: B 解説: RedshiftはUPDATE/DELETEを即座に物理削除せず削除マークを付ける。VACUUMで物理的に削除し、SORT KEYの順序を再整列することでクエリ性能を回復する。定期的な実行が推奨。


Part 2: 分析・設計 (問21-45)

問21: Amazon Redshiftで結合操作を最適化するための最良の方法は?

A) CROSS JOINを使用 B) JOINキーを分散キー(DISTKEY)に設定して、データのコロケーションを実現 C) SUBQUERYで代替 D) JOINを避けて非正規化

正解: B 解説: 同じDISTKEYで分散された2つのテーブルをJOINすると、データ移動が不要(コロケーション)で高速。ファクトテーブルとディメンションテーブルで同じキーをDISTKEYに設定するのが基本パターン。


問22: Amazon Athena のパーティションプロジェクションを使用する利点は?

A) クエリが自動的に最適化される B) Glue Data Catalogに事前にパーティション情報がなくても、テーブル定義からパーティションを推定してADD PARTITIONが不要 C) コストが削減される D) データ変換が自動化される

正解: B 解説: Partition Projectionを設定すると、Glue Catalogにパーティション情報を事前登録する必要なく、テーブルプロパティ定義に基づいてAthenaがパーティションパスを計算する。大量パーティション時のメタデータオーバーヘッドを削減。


問23: ELT vs ETL の選択基準は?

A) ELTは常に優れている B) クラウドDWH(Redshift等)の処理能力を活用できる場合はELT、変換処理が複雑でDWH外部処理が必要な場合はETL C) ETLはオンプレのみ D) データ量が小さい場合はELT

正解: B 解説: ELT(Extract, Load, Transform)はS3にRAWデータをロードしてRedshift/AthenaでSQL変換するクラウドネイティブアプローチ。ETLはGlue/EMRで変換後ロードする。現代的なデータレイクではELT/ELTが主流。


問24: Amazon OpenSearch ServiceでのKibanaダッシュボードのアクセス制御に推奨される方法は?

A) IPアドレスによる制限のみ B) Fine-grained access control (FGAC) で Cognito認証+ロールベースのアクセス C) パスワードなし公開設定 D) VPNのみで保護

正解: B 解説: Fine-grained Access Control (FGAC)はAmazon Cognitoによる認証と、インデックス・ドキュメントレベルの細かいアクセス制御を提供。本番環境では必須の設定。


問25: Glue Job で小さなファイルが大量にあるS3データを効率的に処理するには?

A) 全ファイルを個別に処理 B) Grouping Files機能またはCombineS3FilesでSparkパーティションにまとめてから処理 C) 処理前に手動でファイルを結合 D) EMRに切り替え

正解: B 解説: GlueのgroupFilesオプションで小さなファイルをSparkのパーティションにグループ化し、groupSizeで1パーティションのサイズを制御。大量の小さなファイル問題(small file problem)を解決。


問26: データレイクのアーキテクチャでメダリオンアーキテクチャ(Bronze/Silver/Gold)の各レイヤーは?

A) Bronze: 結果, Silver: 中間, Gold: RAW B) Bronze: RAWデータ, Silver: クレンジング済みデータ, Gold: ビジネス集計データ C) Bronze/Silver/Goldはコスト階層 D) ストレージクラスの名称

正解: B 解説: メダリオンアーキテクチャ: Bronze(生データそのまま保存)→Silver(クレンジング・正規化・デデュープ)→Gold(ビジネスロジック適用・アグリゲーション)。各層を分離することで再処理が容易。


問27: Amazon Redshift Serverless のユースケースとして最も適切なのは?

A) 24時間365日高スループットが必要なOLTPシステム B) 間欠的なクエリや予測困難なワークロード、開発・テスト環境 C) 常にクラスター規模の制御が必要な場合 D) 100TB以上の大規模データウェアハウス

正解: B 解説: Redshift Serverlessはアイドル時は課金なし、自動スケーリングで間欠的・予測困難なワークロードに最適。開発環境やアドホック分析にも適する。大規模で予測可能なワークロードはプロビジョニングクラスターが経済的。


問28: AWS Glue の Workflow 機能の主な目的は?

A) ETLジョブの並列実行のみ B) 複数のCrawler、Trigger、Jobを連携させてE2Eデータパイプラインをオーケストレート C) リアルタイムデータ処理 D) データの可視化

正解: B 解説: Glue Workflowsは複数のGlueジョブ・クローラー・トリガーを組み合わせてDAG(有向非巡回グラフ)形式のパイプラインを定義・実行・モニタリングする機能。


問29: S3データレイクでのデータ品質管理にGlue Data Qualityを使用する際の主な機能は?

A) データの自動変換 B) DQDL(Data Quality Definition Language)でルールを定義し、違反を検出・アラート C) コストの最適化 D) パフォーマンスの監視

正解: B 解説: Glue Data QualityはDQDL(例: ColumnValues "age" between 0 and 120)でルールを定義し、ルール違反データの検出、スコアリング、CloudWatchアラートを提供する。


問30: Amazon Redshiftの「WLM (Workload Management)」の目的は?

A) データの暗号化 B) クエリキューの管理とリソースの優先順位付け C) バックアップの自動化 D) パーティションの管理

正解: B 解説: WLMは複数のクエリキュー(例: 短時間クエリ用、BI用、長時間バッチ用)を定義し、キューごとにメモリ/コンカレンシーを割り当てる。短時間クエリが長時間クエリに遅延されるのを防ぐ。


問31: データレイクにおいてApache Iceberg テーブルフォーマットの主なメリットは?

A) 圧縮率が高い B) ACID トランザクション, スキーマ進化, タイムトラベル, パーティション進化をサポート C) 読み取り専用 D) Hadoop環境でのみ動作

正解: B 解説: Apache IcebergはS3上でACIDトランザクション(複数ファイルの原子的更新)、スキーマの安全な変更、AS OF TIMESTAMPでのタイムトラベルクエリ、パーティション構造の変更をサポートするオープンテーブルフォーマット。


問32: Kinesis Data Streams のEnhanced Fan-Outの使用場面は?

A) コンシューマーが1つだけの場合 B) 複数のコンシューマーが同一シャードを高スループット(最大2MB/秒/シャード)で並列読み取りが必要な場合 C) データ量が少ない場合 D) ファイルバッチ処理

正解: B 解説: 通常のGetRecords()は全コンシューマーで2MB/秒/シャードを共有するが、Enhanced Fan-Out(RegisterStreamConsumer)は各コンシューマーに専用2MB/秒/シャードを提供。低レイテンシのプッシュモデル。


問33: EMR on EKSを使用する場面は?

A) Spark環境とKubernetes環境を分離したい場合 B) 既存のEKSクラスターでSparkジョブを実行し、コンテナ化されたワークロードと混在させたい場合 C) 小規模データのみ D) EMR on EC2と完全に同一

正解: B 解説: EMR on EKSはEKSクラスターでSparkジョブをPodとして実行し、既存のKubernetesインフラと統合できる。CI/CDパイプライン統合、マイクロサービスとSparkジョブの同一クラスター実行に適する。


問34: Amazon QuickSight でのサードパーティデータソース(オンプレミスDB)への接続に必要な設定は?

A) インターネット経由の直接接続 B) VPC ConnectionとプライベートIPを使用したDirect Connect/VPN経由の接続 C) QuickSightはオンプレミスに未対応 D) EC2プロキシを経由

正解: B 解説: QuickSightのVPC Connectionを使用して、Direct ConnectやVPN経由でオンプレミスDBにプライベートネットワーク接続できる。セキュリティグループとENI設定が必要。


問35: AWS Glue Studioとは?

A) Glueのコマンドラインツール B) ノーコードビジュアルETLインターフェース(ドラッグ&ドロップでジョブ作成) C) データカタログUI D) メトリクスダッシュボード

正解: B 解説: Glue Studioはビジュアルジョブエディターで、データソース、変換(Filter/Join/Aggregate等)、ターゲットをGUI上で接続してETLジョブを作成できる。生成されたPySpark/Scalaコードも確認・編集可能。


問36: S3上のParquetファイルのスキーマをAthenaで更新する際の注意事項は?

A) スキーマ変更は自動的に反映される B) 既存パーティションには適用されないため、MSCK REPAIR TABLEまたはALTER TABLEで更新が必要 C) スキーマ変更はファイルの再作成が必要 D) スキーマ変更はAthenaでは不可

正解: B 解説: Athenaのテーブルスキーマ変更後、既存パーティションには自動反映されない。MSCK REPAIR TABLEで全パーティションを再スキャンするか、ALTER TABLE ADD PARTITIONで個別に追加する。


問37: Apache Kafka (MSK) でExactly-Once Semanticsを保証するための設定は?

A) enable.auto.commit=true B) プロデューサーのenable.idempotence=trueとトランザクションAPIの使用 C) acks=1の設定 D) EOS保証は不可能

正解: B 解説: Kafkaで正確に一度の配信を保証するには: プロデューサーのenable.idempotence=truetransactional.id設定、beginTransaction()/commitTransaction()でのトランザクション処理が必要。


問38: Amazon Redshift の「Materialized View」を使用する場面は?

A) 全てのVIEWはMaterialized Viewに変換すべき B) 頻繁に実行される複雑なクエリを事前計算し、クエリ応答時間を大幅に短縮したい場合 C) リアルタイムデータが必要な場合 D) 書き込みパフォーマンスの向上

正解: B 解説: Materialized Viewは集計・JOIN結果を物理テーブルとして保存し、同じクエリを高速化する。REFRESH MATERIALIZED VIEWで更新。頻繁に参照されるダッシュボードのバックエンドクエリに最適。


問39: データレイクのアクセスパターンに応じた最適なS3ストレージクラスは?

A) 常にS3 Standardを使用 B) 直近データ(S3 Standard)→中期(S3 IA/Intelligent-Tiering)→長期アーカイブ(Glacier)のライフサイクルポリシー C) コスト削減のため全データをGlacierに D) S3 Express One Zoneのみを使用

正解: B 解説: データアクセス頻度に応じたS3ライフサイクルポリシーを設定: 0-30日→Standard、30-90日→Standard-IA、90-365日→Intelligent-Tiering、365日+→Glacier Deep Archive。


問40: Glue Crawler でDynamoDBテーブルを走査する際の注意事項は?

A) DynamoDBはGlue Crawlerに非対応 B) Crawlerは無作為サンプリングするため、属性が揃っていない場合はスキーマが不完全になる可能性がある C) 全データを走査するため高コスト D) スキャン操作のため本番DB利用は禁止

正解: B 解説: DynamoDB Crawlerはサンプリングベースでスキーマを推定するため、アイテムによって属性が異なる場合は検出されない属性がある。全属性を網羅するにはカスタムCrawler設定またはGlueジョブでのスキャンが確実。


問41: Amazon Redshiftの「Serverless」と「Provisioned」どちらを選択すべきか?

A) Serverlessは常に安価 B) 間欠的ワークロードにはServerless、一定の高スループットにはProvisioned C) Serverlessはスモールビジネス向け D) どちらも同じ価格

正解: B 解説: Serverlessはアクティブな時間のみ課金(RPU: Redshift Processing Units)。連続的に多量のクエリが実行される場合はProvisionedの方が経済的。開発環境やアドホック分析にはServerlessが適切。


問42: AWS の「データメッシュ」アーキテクチャでの各ドメインのデータ所有と中央ガバナンスのバランスを保つツールは?

A) S3バケットポリシーのみ B) AWS Lake Formation (中央ガバナンス) + ドメインごとのGlue Data Catalog (分散所有) C) 一つのGlueカタログで全ドメインを管理 D) Lake Formationはデータメッシュに非対応

正解: B 解説: データメッシュではLake Formationで中央集権的なアクセス制御ポリシーを管理しつつ、各ドメインが独自のGlueカタログ(またはアカウント)でデータを所有・管理するアーキテクチャが推奨。


問43: Amazon Kinesis Data Analytics でWindowを使用したリアルタイム集計の種類は?

A) Windowなしの全データ集計のみ B) Tumbling Window (固定区間), Sliding Window (スライド), Session Window (アクティビティ) C) 時間ベースのWindowのみ D) Count-basedのWindowのみ

正解: B 解説: Flinkがサポートする主なWindowタイプ: Tumbling(重複なし固定区間), Sliding(重複あり), Session(一定時間の非アクティビティで区切る), Count(イベント数ベース)。


問44: Amazon Redshiftでの「インターリーブソートキー」vs「コンパウンドソートキー」の違いは?

A) 機能は同じ B) コンパウンドは列の順序が重要(先頭列が最優先), インターリーブは複数列を均等に重み付けして任意の列でのフィルタリングに対応 C) インターリーブは廃止予定 D) コンパウンドのみ推奨

正解: B 解説: Compound Sort Key: 列順序が重要で、最初のキー列でのクエリに最適。Interleaved Sort Key: 複数の等しく重要なクエリ列がある場合に有効だが、VAGCUUMとロードが遅い。一般的にCompoundが推奨。


問45: データレイクでのデータ系譜(Data Lineage)の追跡にAWS内で使用できるサービスは?

A) CloudTrailのみ B) Amazon DataZone または AWS Glue Data Catalog (血統情報) + Amazon Macie (データ分類) C) S3バケットバージョニング D) データ系譜追跡はAWSで不可

正解: B 解説: Amazon DataZoneはデータポータル、カタログ、系譜(lineage)管理を提供する統合データガバナンスサービス。Glue Data Catalogも血統情報を一部追跡可能。Apache Atlasのようなサードパーティツールとの統合も可能。


Part 3: セキュリティ・コスト最適化 (問46-65)

問46: S3データレイクでGDPRの「忘れられる権利」に対応するための技術的実装は?

A) S3バケット全体の削除 B) 暗号化キーの削除(暗号鍵消去によるクリプト消去)またはオブジェクト単位の削除 C) データの匿名化のみで対応 D) GDPRはS3には適用されない

正解: B 解説: 個人データを特定のKMSキーで暗号化しておき、削除要求時にそのキーを削除(クリプト消去)することで、暗号化されたデータを読み取り不能にする方法が効率的。個別オブジェクト削除でも対応可能。


問47: Amazon Macie の主な機能は?

A) データ暗号化の自動化 B) S3バケット内の機密データ(PII等)の自動検出と分類 C) ネットワーク監視 D) アクセスキーのローテーション

正解: B 解説: MacieはML/パターンマッチングでS3内のPII(SSN、クレジットカード、パスポート等)、財務情報、認証情報などの機密データを自動検出し、Security HubやEventBridgeにアラートを送信する。


問48: Redshiftのデータへのコラム(列)レベルの暗号化を実現するには?

A) Redshiftがネイティブサポート B) アプリケーション側での暗号化またはKMSデータキーを使用したLambdaによる変換 C) 列レベルの暗号化は不可能 D) PostgreSQL拡張機能を使用

正解: B 解説: Redshiftはテーブル/クラスターレベルの暗号化をサポートするが、列単位の暗号化はネイティブ未対応。アプリケーション側でKMSエンベロープ暗号化で特定列を暗号化してからロードする実装が必要。


問49: Kinesis Data Streams のデータ保存期間のデフォルトと最大値は?

A) デフォルト: 1日, 最大: 7日 B) デフォルト: 24時間, 最大: 365日 C) デフォルト: 7日, 最大: 365日 D) デフォルト: 1時間, 最大: 30日

正解: B 解説: Kinesisのデフォルト保存期間は24時間(無料), 最大365日(有料の延長保存)。拡張データ保持(7日超)はStream-mode ON-DEMANDで最大365日に設定可能。


問50: Amazon Athena Workgroupでのコスト制限設定の方法は?

A) IAMポリシーのみで制限 B) WorkgroupのBytesScannedCutoffPerQueryでクエリ毎の最大スキャン量を設定 C) S3バケットのサイズ制限 D) コスト制限は設定不可

正解: B 解説: Athena WorkgroupでBytesScannedCutoffPerQueryを設定すると、指定バイト数を超えるクエリはキャンセルされ、コストの上限を強制できる。ビジネスユーザーが誤って大量スキャンするのを防ぐ。


問51: EMR クラスターのコスト最適化で最も効果的な方法は?

A) インスタンスを常に最大サイズに設定 B) Task NodeにSpot Instanceを使用し、Auto Scalingで必要時のみ追加 C) 常時稼働させてコールドスタートを防ぐ D) コスト削減は不可能

正解: B 解説: EMRのコスト最適化: (1)TaskノードをSpot Instanceで80-90%コスト削減(中断しても再実行可能) (2)Auto ScalingでピークタイムのみTaskノード追加 (3)EMR Serverlessで使用分のみ課金。


問52: Glue ETLジョブのコスト最適化策は?

A) Worker TypeをG.2Xに固定 B) Glue 3.0以降の使用(効率改善)、不要なDynamicFrame変換の削減、Auto Scalingの有効化 C) ジョブを細かく分割 D) 全データをメモリにロード

正解: B 解説: Glueコスト最適化: (1)Auto Scaling有効化(--enable-auto-scaling) (2)不要なcache/persist削除 (3)プッシュダウン述語でS3スキャン削減 (4)Small Workertype(G.025X)を軽量処理に使用 (5)Job Bookmarkで処理済みデータをスキップ。


問53: Amazon Redshift でクエリ実行プランを確認するには?

A) CloudWatchメトリクスのみ B) EXPLAIN文でクエリプランを確認し、Distribution/Sort keyの効果を分析 C) Redshift Consoleのみ D) クエリプランの確認は不可能

正解: B 解説: EXPLAIN SELECT ...でRedshiftのクエリ実行プランを確認。DS_BCAST_INNER(ブロードキャスト)やDS_DIST_INNER(分散)の操作コストを確認し、DISTKEY/SORTKEYの最適化に活用する。


問54: データウェアハウスのディメンションテーブルの更新戦略で「ゆっくり変化するディメンション(SCD)」タイプ2の特徴は?

A) 最新値のみ保持 B) 変更履歴を新行として追加し、有効開始日・終了日・現在フラグを持つ C) 変更前のデータを上書き D) 変更履歴は別テーブルに保存

正解: B 解説: SCD Type 2は顧客の住所変更等の履歴を全て保持する手法。変更時に新行を追加し、effective_dateexpiry_dateis_currentフラグで管理。Redshift/DeltaLake/Icebergでの実装が一般的。


問55: AWS Lake Formationのデータ共有(Data Sharing)でクロスアカウントのデータアクセスを実現する方法は?

A) S3バケットポリシーのみで実装 B) Lake Formation Resource Linkを作成し、クロスアカウントのGlue Data Catalogリソースへのアクセスを許可 C) VPCピアリングのみで対応 D) クロスアカウントアクセスは不可

正解: B 解説: Lake Formation Resource LinkはGlue Data Catalogデータベース/テーブルのエイリアスを別アカウントに作成し、元テーブルへのアクセスを制御する。RAM(Resource Access Manager)を使ってクロスアカウント共有。


問56: Kinesis Data Streams でコスト削減をしながらスループットを確保する方法は?

A) シャード数を常に最大に設定 B) On-Demand モードを使用してトラフィックに応じた自動スケーリング C) コスト削減とスループット確保は両立不可 D) Enhanced Fan-Outを必ず有効化

正解: B 解説: Kinesis Data Streams On-Demandモードはシャードを自動管理し、ピーク時はスケールアップ、低負荷時はスケールダウンして最適コストを実現。事前のキャパシティプランニングが不要。


問57: Amazon OpenSearch ServiceのUltra Warmノードの役割は?

A) リアルタイムインデックス更新 B) アクセス頻度の低い古いインデックスを安価なS3+メモリキャッシュ層で保持 C) マスターノードの代替 D) 暗号化処理の専用ノード

正解: B 解説: UltraWarmはS3ベースのストレージとメモリキャッシュの組み合わせで、ホットノード(SSD)の約10分の1のコストで低頻度アクセスデータを保持。ISMポリシーで自動的にホット→UltraWarmに移行できる。


問58: Redshift の「Concurrency Scaling」の機能は?

A) クエリの並列実行 B) ピーク時にクラスターに追加のクエリ処理キャパシティを自動追加 C) データの自動分散 D) バックアップのスケジュール

正解: B 解説: Concurrency Scalingは読み取りクエリのキューが設定された閾値を超えると、数秒以内に追加のクラスターキャパシティをプロビジョニングしてクエリ処理を分散させる機能。1日あたり1時間無料。


問59: S3 Select/Glacier Selectとは何か?

A) S3バケットの選択UI B) S3/Glacier内のオブジェクトからSQL形式でデータを部分的に抽出(ダウンロードを削減) C) データのコピー機能 D) アクセスログの検索

正解: B 解説: S3 SelectはSQL SELECT文でS3オブジェクト内のデータをフィルタし、全オブジェクトではなく必要な行・列のみを返す。データ転送量とコストを大幅に削減。Parquet/CSV/JSONをサポート。


問60: データパイプラインのデータ品質を監視するためのAWSサービス構成は?

A) CloudWatchのみで十分 B) Glue Data Quality + CloudWatch Metrics + SNS アラート + S3への違反データ保存 C) 人手による定期チェックのみ D) データ品質監視は不要

正解: B 解説: Glue Data Qualityルールを定義→品質スコアとルール結果をCloudWatchに送信→閾値以下でSNSアラート→違反データをS3に隔離、という監視パイプラインが推奨される構成。


問61: Amazon Athena ACID トランザクション機能(Apache Icebergテーブル)の利点は?

A) 読み取り性能の向上のみ B) INSERT/UPDATE/DELETE/MERGEが可能になり、データの更新・削除と整合性が保証される C) 自動スキーマ推定 D) コストが半減

正解: B 解説: AthenaのAcid対応(Icebergテーブル)によりUPDATE, DELETE, MERGE INTOが可能になり、CDC(変更データ取り込み)やSCD実装、データの修正が直接SQL で実行できる。


問62: Amazon Redshiftの「Automatic Table Optimization (ATO)」とは?

A) 自動バックアップ B) RedshiftがワークロードのクエリパターンをMLで分析し、DISTKEY/SORTKEYを自動で推奨・適用する機能 C) テーブルの自動削除 D) 自動インデックス作成

正解: B 解説: ATOは過去のクエリパターンを分析してDISTKEY/SORTKEYの最適設定を学習し、自動的に変更(VACUUM/ANALYZEの自動実行も含む)。テーブル定義時にDISTSTYLE AUTOやSORTKEY AUTOを設定すると有効。


問63: データガバナンスのフレームワークとしてAmazon DataZoneが提供する機能は?

A) データ暗号化のみ B) データカタログ、データポータル(発見・アクセス申請)、系譜追跡、品質スコア、ガバナンスポリシーの統合管理 C) ETLジョブの実行 D) リアルタイムデータ処理

正解: B 解説: Amazon DataZoneはエンタープライズデータガバナンスプラットフォームで、データカタログ(自動発見)、ビジネスグロッサリー、データポータル(セルフサービス検索・アクセス申請)、系譜、品質、コンプライアンス機能を統合提供する。


問64: Redshift でテーブルの統計情報が古い場合の症状と対処法は?

A) 症状: データ不整合, 対処: COPY再実行 B) 症状: クエリプランが最適化されず低速, 対処: ANALYZE コマンドの実行 C) 症状: 接続エラー, 対処: クラスター再起動 D) 症状: 暗号化エラー, 対処: キー更新

正解: B 解説: Redshiftは統計情報に基づいてクエリプランを最適化する。大量データの追加後にANALYZEコマンドを実行して統計情報を更新することでクエリ性能を回復できる。ANALYZE COMPRESSIONで圧縮エンコードも最適化。


問65: 大規模データレイクでのコスト削減に最も効果的な施策の組み合わせは?

A) 全データをS3 Standardに保存 B) Parquet変換+パーティション設計+S3ライフサイクルポリシー+Athena Workgroup制限の組み合わせ C) データ量を減らすことのみ D) 全クエリをRedshiftに移行

正解: B 解説: データレイクコスト最適化: (1)ParquetでAthenaスキャンコスト最大90%削減 (2)パーティションでスキャン範囲を絞る (3)S3ライフサイクルで古いデータをGlacierへ (4)Workgroupで予期しない大量スキャンを防止。


[DAS-C01 模擬試験 第1回 - 65問完了]


模擬試験 第2回 追加問題 (20問)

問66: Amazon Redshift RAとはどのような機能か?

A) リアルタイム分析エンジン B) RA3インスタンスはマネージドストレージ(S3)とコンピュートを分離し、データ量に応じてストレージを独立してスケール C) Remote Analyticsの略称 D) Redshift Acceleratorの略称

正解: B 解説: RA3インスタンスファミリーはコンピュートとストレージを分離したアーキテクチャで、マネージドストレージ(AQUA)を使用。データ量が増えてもコンピュートをスケールせずにストレージのみ増やせる。


問67: Apache Spark の「Broadcast Join」を使用する条件は?

A) 大きいテーブル同士のJOIN B) 片方のテーブルが小さい(~10MB以下)場合に全executorにブロードキャストしてシャッフルを回避 C) キー分散が偏っている場合 D) JOINキーが異なるデータ型の場合

正解: B 解説: Broadcast JoinはSparkがドライバーから全executorに小さいテーブルのコピーを送り、シャッフルなしでJOINを実行する最適化。spark.sql.autoBroadcastJoinThreshold(デフォルト10MB)以下で自動適用。


問68: Amazon Kinesis Data Streams の「Enhanced Fan-Out」と通常の GetRecords() の違いは?

A) 違いはない B) Enhanced Fan-Out: 各コンシューマーに専用2MB/秒、HTTP/2プッシュ型。通常: 全コンシューマーで共有2MB/秒、ポーリング型 C) Enhanced Fan-Out: 古いデータのみ対応 D) Enhanced Fan-Out: プロデューサー側の機能

正解: B 解説: 通常のGetRecords()は全コンシューマーでシャードあたり2MB/秒を共有し、ポーリングが必要。Enhanced Fan-Outは各登録コンシューマーに専用2MB/秒帯域を提供し、HTTP/2によるプッシュ配信で低レイテンシ(~70ms vs ~200ms)。


問69: AWS Glue の Job Bookmark 機能の目的は?

A) ジョブのスケジューリング B) 以前に処理したデータをトラッキングし、増分処理(新データのみ処理)を実現 C) クエリのキャッシュ D) ジョブのバージョン管理

正解: B 解説: Job Bookmarkは前回のジョブ実行で処理済みのデータの状態を記録し、次回実行時に新しいデータのみ処理する機能。S3の増分ロードやJDBC増分抽出に使用。--job-bookmark-option=job-bookmark-enableで有効化。


問70: データウェアハウスとデータレイクの主な違いは?

A) データウェアハウスは大規模、データレイクは小規模 B) データウェアハウス: スキーマ定義済み構造化データのSQLアクセス, データレイク: 全形式のRAWデータを格納し後からスキーマを定義 C) データレイクはリアルタイム専用 D) どちらも同じ技術の異なる名称

正解: B 解説: DWH(Schema-on-Write): ETL後の整形済みデータをSQL用に最適化。Data Lake(Schema-on-Read): RAWデータをS3に格納し、分析時にスキーマを適用。現代はLake House(両方の統合)が主流。


問71: S3 Intelligent-Tieringを使用すべきケースは?

A) アクセスパターンが予測可能な場合 B) アクセスパターンが予測不能なデータで、自動的に適切なストレージ層に移動してコスト最適化 C) 1回だけアクセスするデータ D) 常に頻繁にアクセスするデータ

正解: B 解説: Intelligent-Tieringはアクセスパターンをモニタリングし、Frequent Access、Infrequent Access、Archive Instant Access、Archive Access、Deep Archive Accessの5層間を自動移動してコストを最適化する。


問72: Amazon Timestream での時系列データクエリで最も効率的な方法は?

A) フルスキャン B) タイムスタンプ範囲フィルタ(measure_name + time条件)でマグネティックストアアクセスを回避 C) ランダムアクセスのみ対応 D) 全データをメモリにロード

正解: B 解説: Timestreamは最新データ(メモリストア)と古いデータ(マグネティックストア=S3)の2層構造。クエリに時間範囲条件(WHERE time BETWEEN ...)を含め、最近のデータにはメモリストアのみアクセスすることでレイテンシを最小化。


問73: Glue ETLでPandas DataFrameを使用する場合の注意事項は?

A) Glueは完全にPandasをサポート B) PandasはシングルノードのためGlueの分散処理の恩恵がなく、大データセットにはPySpark DynamicFrameを使用すべき C) PandasはGlueで使用不可 D) PandasとPySparkは機能的に同一

正解: B 解説: GlueはSparkクラスター上で動作するため、PandasのDataFrameはドライバーノードのメモリに限定される。大規模データにはPySpark DynamicFrame/DataFrameを使用し、分散処理の恩恵を受ける。Pandas on Spark(Koalas)は代替手段。


問74: Amazon Athena の「Federated Query」でクエリできるデータソースは?

A) S3のみ B) S3に加え、RDS/Aurora/DynamoDB/Redshift/CloudWatch等のカスタムLambdaコネクター対応データソース C) Redshiftのみ D) DynamoDBのみ

正解: B 解説: Athena Federated QueryはLambdaデータソースコネクターを通じてS3以外のデータソース(RDS MySQL/PostgreSQL、DynamoDB、ElastiCache、CloudWatch Logs等)を直接クエリし、クロスソースのJOINが可能。


問75: Redshift の「Result Caching」機能の特徴は?

A) 全クエリが自動キャッシュされる B) 同じクエリ+同じユーザー+データ変更なしの場合、キャッシュから返し実行コストゼロ C) 外部クエリのみキャッシュ対象 D) キャッシュは1分後に消える

正解: B 解説: Redshift Result CachingはSET enable_result_cache_for_session=on(デフォルト)で有効。同一クエリ文字列・同一ユーザーでキャッシュ期間(最大24時間)内にテーブルが変更されていない場合、キャッシュから即座に返却。


問76: AWS Clean Rooms の主な用途は?

A) データの削除と整理 B) 複数企業間でデータを共有せずに安全に分析・照合する「クリーンルーム」環境の提供 C) S3バケットのクリーニング D) ETLジョブの管理

正解: B 解説: AWS Clean Roomsは医療機関間の患者データ照合、広告主とメディア企業の重複オーディエンス分析など、実データを共有せずにプライバシー保護した共同分析を可能にするサービス。差分プライバシーとクエリ制限をサポート。


問77: Spark の「Data Skew」(データの偏り)問題を解決する方法は?

A) シャッフルを増やす B) Salt Key(ランダムプレフィックスをキーに追加)またはBroadcast JOINで偏ったパーティションを分散 C) データを削減 D) 解決不可能

正解: B 解説: 特定のキー値に大量のデータが偏ると、一部のPartitionが処理に時間がかかる(Skew)。対策: (1)Salt Key: キーにランダムサフィックスを追加して均等分散 (2)Broadcast Join: 小テーブルを全ノードに複製 (3)Salted Join: 両手法の組み合わせ。


問78: Amazon OpenSearch Serverless と OpenSearch Service の主な違いは?

A) 機能は同じ B) Serverless: インデックスキャパシティの自動スケーリング・管理不要, Service: ノード数管理が必要だが詳細制御可能 C) Serverlessはログ分析専用 D) ServiceはServerlessより常に安価

正解: B 解説: OpenSearch Serverlessはクラスター管理不要で検索/インデックスキャパシティが自動スケール、OCU(OpenSearch Compute Unit)課金。予測困難なワークロードに適する。サービスは細かいシャード/ノード設定が可能。


問79: AWS Glue の Streaming ETL の特徴は?

A) バッチ処理のみ対応 B) Kinesis/KafkaをソースとしてSpark Structured Streamingで継続的なETL処理が可能 C) リアルタイム処理は不可能 D) Lambda関数のみストリーミング対応

正解: B 解説: Glue Streaming ETLはKinesis Data Streams、Kafka、MSKをソースにしてSparkの構造化ストリーミングで継続的なETL処理を実現。マイクロバッチ間隔を設定し、Glue Data Catalogのスキーマ変換もサポート。


問80: Amazon Redshiftでテーブルのエンコード(圧縮)を最適化するには?

A) 圧縮はパフォーマンスに影響しない B) ANALYZE COMPRESSIONコマンドで推奨エンコードを確認し、ENCODE AUTOで自動最適化 C) 常にRAW(圧縮なし)に設定 D) エンコードの変更は不可能

正解: B 解説: ANALYZE COMPRESSION table_nameで各列の推奨圧縮アルゴリズムを表示。ENCODE AUTO(デフォルト)を設定するとRedshiftが自動的に最適なエンコードを選択・変更する。圧縮によりスキャン量が削減され性能向上。


問81: Kinesis Data Streams でシャードの「ホットシャード」問題を防ぐ方法は?

A) シャード数を増やす B) パーティションキーのカーディナリティを高くする(ユーザーIDなど、多数のユニーク値) C) ランダムなパーティションキーを使用 D) シャードを結合

正解: B 解説: ホットシャードはパーティションキーのカーディナリティが低い(例: 国コード)と特定シャードに集中する問題。高カーディナリティなキー(UUID、ユーザーID)を使用するか、ランダムソルト(key + random(1,N))を付加してシャードを分散。


問82: AWS Glue でDelta Lakeテーブルを処理するためには?

A) Glueは標準でDelta Lakeをサポート B) --datalake-formats deltaパラメータとDelta Lakeライブラリの設定が必要 C) Delta LakeはAWS非対応 D) EMRのみDelta Lakeに対応

正解: B 解説: Glue 4.0以降は--datalake-formats deltaジョブパラメータを設定することでDelta Lakeのネイティブサポートが有効化される。DeltaTable.forPath(spark, s3_path)でDelta Lakeテーブルをread/write/merge可能。


問83: データレイクのパーティション設計のベストプラクティスは?

A) 全カラムでパーティション B) クエリパターン(WHERE句)に合わせたパーティションキー選択 (例: year/month/day, region, category) C) パーティションなし D) ランダムなパーティション

正解: B 解説: パーティションキーはクエリのWHERE句で頻繁にフィルタするカラムを選択。日時ベース(year/month/day)が一般的。カーディナリティが高すぎる列(ユーザーID等)はパーティション爆発を招く。Hive形式col=valueを採用。


問84: Amazon Redshift Spectrum の External Tableのパフォーマンス最適化は?

A) パーティションなしのFullスキャン B) Parquet+パーティション+Predicate Pushdownで不要データをSpectrumレイヤーでフィルタ C) 全データをRedshiftにロード D) CSVのままで最適

正解: B 解説: Spectrum最適化: (1)ParquetでSpectrumレイヤーのスキャン量を削減 (2)パーティションでファイル数を削減 (3)Predicate PushdownでS3レイヤーでのフィルタリングを活用(WHERE条件をSpectrumノードで実行)。


問85: DAS-C01 試験でのデータパイプライン設計問題のアプローチ方法は?

A) 常に最も高価なサービスを選択 B) データの流れ(収集→保存→処理→分析→可視化)に沿って各段階の最適サービスを特定 C) シンプルな解決策を避けて複雑な設計を選択 D) 全てSageMakerで解決

正解: B 解説: DAS試験のシナリオ問題は: (1)要件分析(リアルタイム/バッチ?構造化/非構造化?) (2)各レイヤーのAWSサービス選択 (3)コスト・スケーラビリティ・セキュリティのトレードオフ評価 の順で考える。


付録: DAS-C01 高度チェックリスト

【DAS-C01 上級者向け確認事項】

■ Redshift 詳細
  ✓ DISTKEY/SORTKEY選択の判断基準
  ✓ WLMキューの設定とConcurrency Scaling
  ✓ Spectrum の Pushdown最適化
  ✓ RA3インスタンスとManagedストレージ
  ✓ Materialized View と Result Caching
  ✓ VACUUM/ANALYZE の適切なタイミング

■ Kinesis エコシステム
  ✓ KDS: シャード計算式、Enhanced Fan-Out、On-Demand
  ✓ Firehose: バッファリング設定、フォーマット変換、Lambda変換
  ✓ Analytics (Flink): Window種類, SQL/Java/Python API
  ✓ KDS vs MSK の使い分け基準

■ データ処理
  ✓ Glue: Job Bookmark, Grouping Files, DQ, Streaming
  ✓ EMR: Spot活用, Auto Scaling, EMR Serverless
  ✓ Spark: Adaptive Query Execution, Broadcast Join, Skew対策
  ✓ Delta Lake/Iceberg: ACID, タイムトラベル

■ データレイク
  ✓ Lake Formation: TBAC, 行フィルタ, 列アクセス制御
  ✓ S3: ライフサイクル, Intelligent-Tiering, S3 Select
  ✓ Athena: パーティションプロジェクション, CTAS, Federated Query
  ✓ メダリオンアーキテクチャ(Bronze/Silver/Gold)

■ セキュリティ
  ✓ KMS CMKとS3暗号化の組み合わせ
  ✓ VPCエンドポイント (Gateway: S3, Interface: Glue等)
  ✓ Lake Formation クロスアカウント共有
  ✓ Macie によるPII検出
  ✓ データメッシュとデータガバナンス

■ コスト最適化
  ✓ Athena: Parquet+パーティション+列選択
  ✓ EMR: Spot Instance + Auto Scaling
  ✓ S3: ライフサイクルポリシー
  ✓ Redshift: Pause/Resume, Concurrency Scaling
  ✓ Glue: Auto Scaling, Job Bookmark

【試験戦略】
1. データの特性を先に特定: 構造化/非構造化、リアルタイム/バッチ、規模
2. 要件の優先順位: コスト/パフォーマンス/セキュリティのトレードオフ
3. マネージドサービスを優先: Firehose > KDS + カスタム, Glue > 自前ETL
4. セキュリティ問題: Lake Formation + KMS + VPCエンドポイントの組み合わせ
5. コスト問題: サービス使用量課金とデータ量の両方を考慮

DAS-C01 (AWS Certified Data Analytics Specialty) 試験対策ガイド完成 作成日: 2026-04 対象試験: AWS Certified Data Analytics – Specialty (DAS-C01)


第7章: データウェアハウス高度設計

7.1 Redshift 高度なSQL・最適化

-- Redshift: ウィンドウ関数の活用
SELECT
    customer_id,
    order_date,
    amount,
    -- 累積合計
    SUM(amount) OVER (
        PARTITION BY customer_id
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS running_total,
    -- 移動平均(7日間)
    AVG(amount) OVER (
        PARTITION BY customer_id
        ORDER BY order_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) AS moving_avg_7d,
    -- 前回比較
    amount - LAG(amount, 1, 0) OVER (
        PARTITION BY customer_id
        ORDER BY order_date
    ) AS amount_delta,
    -- ランキング
    ROW_NUMBER() OVER (
        PARTITION BY customer_id
        ORDER BY amount DESC
    ) AS rank_by_amount,
    DENSE_RANK() OVER (
        ORDER BY amount DESC
    ) AS global_rank
FROM orders
WHERE order_date >= DATEADD(year, -1, GETDATE());

-- Redshift: SUPER型(半構造化データ)
CREATE TABLE events (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data SUPER  -- JSON-likeデータ
) DISTKEY(event_id) SORTKEY(event_time);

-- SUPER型へのデータ挿入
INSERT INTO events SELECT
    1,
    GETDATE(),
    JSON_PARSE('{"user_id": 123, "action": "click", "properties": {"button": "buy", "price": 99.99}}');

-- SUPER型からのデータ抽出
SELECT
    event_id,
    event_data.user_id::INT AS user_id,
    event_data.action::VARCHAR AS action,
    event_data.properties.button::VARCHAR AS button,
    event_data.properties.price::DECIMAL(10,2) AS price
FROM events
WHERE event_data.action = 'click';

-- Redshift: データ共有(Datashare)
-- 生産者(Producer)側
CREATE DATASHARE sales_share;
ALTER DATASHARE sales_share ADD SCHEMA public;
ALTER DATASHARE sales_share ADD TABLE public.orders;
ALTER DATASHARE sales_share ADD TABLE public.customers;
GRANT USAGE ON DATASHARE sales_share TO ACCOUNT '444455556666' VIA NAMESPACE 'abc123def';

-- 消費者(Consumer)側
CREATE DATABASE sales_data FROM DATASHARE sales_share OF ACCOUNT '123456789012' NAMESPACE 'xyz789abc';

-- データ共有テーブルへのクエリ(読み取り専用)
SELECT * FROM sales_data.public.orders LIMIT 10;

-- Redshift: Redshift ML
CREATE MODEL sales_forecast
FROM (
    SELECT
        product_id,
        EXTRACT(YEAR FROM order_date) AS year,
        EXTRACT(MONTH FROM order_date) AS month,
        EXTRACT(DOW FROM order_date) AS day_of_week,
        COUNT(*) AS order_count,
        SUM(amount) AS total_revenue
    FROM orders
    GROUP BY 1, 2, 3, 4
)
TARGET total_revenue
FUNCTION predict_revenue
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftMLRole'
SETTINGS (
    S3_BUCKET 'my-ml-bucket',
    MAX_RUNTIME 3600
);

-- モデル使用
SELECT
    product_id,
    2025 AS year,
    3 AS month,
    2 AS day_of_week,
    predict_revenue(product_id, 2025, 3, 2, 150) AS predicted_revenue
FROM products;

7.2 Amazon OpenSearch Service 高度な設定

import boto3
import json
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

# OpenSearch クライアント設定
def create_opensearch_client(endpoint, region='ap-northeast-1'):
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(
        credentials.access_key,
        credentials.secret_key,
        region,
        'es',
        session_token=credentials.token
    )
    
    client = OpenSearch(
        hosts=[{'host': endpoint, 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection
    )
    return client

# ISM (Index State Management) ポリシー
def create_ism_policy(client, policy_name):
    """ログインデックスのライフサイクル管理"""
    
    policy = {
        'policy': {
            'description': 'Hot-Warm-Cold tier management for logs',
            'default_state': 'hot',
            'states': [
                {
                    'name': 'hot',
                    'actions': [
                        {
                            'rollover': {
                                'min_size': '50gb',
                                'min_index_age': '1d',
                                'min_doc_count': 1000000
                            }
                        }
                    ],
                    'transitions': [
                        {
                            'state_name': 'warm',
                            'conditions': {
                                'min_index_age': '2d'
                            }
                        }
                    ]
                },
                {
                    'name': 'warm',
                    'actions': [
                        {
                            'warm_migration': {},  # Warm ノードに移動
                            'replica_count': {
                                'number_of_replicas': 0  # レプリカ削減でコスト最適化
                            }
                        }
                    ],
                    'transitions': [
                        {
                            'state_name': 'cold',
                            'conditions': {
                                'min_index_age': '30d'
                            }
                        }
                    ]
                },
                {
                    'name': 'cold',
                    'actions': [
                        {
                            'cold_migration': {},  # Cold ストレージに移動(UltraWarm後継)
                        }
                    ],
                    'transitions': [
                        {
                            'state_name': 'delete',
                            'conditions': {
                                'min_index_age': '365d'
                            }
                        }
                    ]
                },
                {
                    'name': 'delete',
                    'actions': [
                        {'delete': {}}
                    ],
                    'transitions': []
                }
            ]
        }
    }
    
    client.transport.perform_request(
        'PUT',
        f'/_plugins/_ism/policies/{policy_name}',
        body=policy
    )

# k-NN(ベクター検索)インデックス設定
def create_vector_index(client, index_name, dimension=1536):
    """埋め込みベクターの検索インデックス(RAG向け)"""
    
    index_body = {
        'settings': {
            'index': {
                'knn': True,
                'knn.algo_param.ef_search': 100,
                'number_of_shards': 5,
                'number_of_replicas': 1
            }
        },
        'mappings': {
            'properties': {
                'embedding': {
                    'type': 'knn_vector',
                    'dimension': dimension,
                    'method': {
                        'name': 'hnsw',
                        'space_type': 'cosinesimil',
                        'engine': 'faiss',
                        'parameters': {
                            'ef_construction': 128,
                            'm': 24
                        }
                    }
                },
                'text': {'type': 'text'},
                'metadata': {'type': 'object'}
            }
        }
    }
    
    client.indices.create(index=index_name, body=index_body)

def vector_search(client, index_name, query_vector, k=5):
    """k-NN ベクター検索(セマンティック検索)"""
    
    query = {
        'size': k,
        'query': {
            'knn': {
                'embedding': {
                    'vector': query_vector,
                    'k': k
                }
            }
        }
    }
    
    response = client.search(index=index_name, body=query)
    return response['hits']['hits']

def hybrid_search(client, index_name, query_text, query_vector, k=5):
    """ハイブリッド検索(BM25 + kNN)"""
    
    query = {
        'size': k,
        'query': {
            'hybrid': {
                'queries': [
                    {
                        'match': {
                            'text': {
                                'query': query_text,
                                'boost': 0.3
                            }
                        }
                    },
                    {
                        'knn': {
                            'embedding': {
                                'vector': query_vector,
                                'k': k,
                                'boost': 0.7
                            }
                        }
                    }
                ]
            }
        }
    }
    
    response = client.search(index=index_name, body=query)
    return response['hits']['hits']

7.3 Amazon QuickSight 高度な機能

import boto3
import json

qs_client = boto3.client('quicksight', region_name='ap-northeast-1')

# QuickSight データセット作成(SPICEモード)
def create_quicksight_dataset(account_id, dataset_name, data_source_arn):
    response = qs_client.create_data_set(
        AwsAccountId=account_id,
        DataSetId=f'{dataset_name}-dataset',
        Name=dataset_name,
        ImportMode='SPICE',  # データをSPICEにキャッシュ
        PhysicalTableMap={
            'SalesTable': {
                'RelationalTable': {
                    'DataSourceArn': data_source_arn,
                    'Schema': 'public',
                    'Name': 'orders',
                    'InputColumns': [
                        {'Name': 'order_id', 'Type': 'STRING'},
                        {'Name': 'customer_id', 'Type': 'STRING'},
                        {'Name': 'amount', 'Type': 'DECIMAL'},
                        {'Name': 'order_date', 'Type': 'DATETIME'},
                        {'Name': 'region', 'Type': 'STRING'}
                    ]
                }
            }
        },
        LogicalTableMap={
            'SalesLogical': {
                'Alias': 'Sales',
                'DataTransforms': [
                    {
                        'ProjectOperation': {
                            'ProjectedColumns': ['order_id', 'customer_id', 'amount', 'order_date', 'region']
                        }
                    },
                    {
                        'FilterOperation': {
                            'ConditionExpression': 'amount > 0'
                        }
                    }
                ],
                'Source': {
                    'PhysicalTableId': 'SalesTable'
                }
            }
        },
        # 行レベルセキュリティ
        RowLevelPermissionDataSet={
            'Namespace': 'default',
            'Arn': 'arn:aws:quicksight:ap-northeast-1:123456789012:dataset/rls-dataset',
            'PermissionPolicy': 'GRANT_ACCESS',
            'FormatVersion': 'VERSION_2'
        }
    )
    return response['DataSetId']

# 埋め込みURL生成(ダッシュボードの埋め込み)
def get_dashboard_embed_url(account_id, dashboard_id, user_arn):
    response = qs_client.generate_embed_url_for_registered_user(
        AwsAccountId=account_id,
        ExperienceConfiguration={
            'Dashboard': {
                'InitialDashboardId': dashboard_id
            }
        },
        UserArn=user_arn,
        SessionLifetimeInMinutes=60,
        AllowedDomains=['https://myapp.example.com']
    )
    return response['EmbedUrl']

# QuickSight Q(Q&A機能)のトピック作成
def create_quicksight_q_topic(account_id, topic_name, dataset_arns):
    response = qs_client.create_topic(
        AwsAccountId=account_id,
        TopicId=f'{topic_name}-topic',
        Topic={
            'Name': topic_name,
            'Description': '売上分析のQ&Aトピック',
            'DataSets': [{'DatasetArn': arn} for arn in dataset_arns],
            'UserExperienceVersion': 'NEW_READER_EXPERIENCE'
        }
    )
    return response['TopicId']

模擬試験 セット2(15問)

問題1 Amazon OpenSearch ServiceのUltraWarmと Cold Storageの違いは?

A) UltraWarmはリアルタイム、Cold Storageはバッチ専用 B) UltraWarm: S3バックのウォームノードで低コスト・フルクエリ機能。Cold Storage: S3にインデックスを移行して最小コスト・クエリ前にAttachが必要 C) Cold StorageはOpenSearch外部のS3ストレージ D) 違いはない

正解: B 解説: OpenSearch Storage Tiers: Hot(インスタンスストア/EBS): 最高パフォーマンス、コスト最高。UltraWarm(S3-backed nodes): ホットの約20分の1のコスト、全クエリ機能対応、インデックスを自動Attach/Detach。Cold Storage(S3): コスト最小、クエリ前に手動でAttach(数分〜数十分)、最低アクセス頻度データ向け。ISMポリシーで自動移行設定が推奨。


問題2 AWS Lake Formationの「Governed Tables」の特徴は?

A) すべてのテーブルタイプをサポートする B) ACID準拠のトランザクション、タイムトラベル、自動コンパクション機能を提供するLake Formation管理のテーブル(Apache Icebergベース) C) Governed TablesはIcebergと互換性がない D) Governed TablesはRedshiftSpectrumからのみアクセス可能

正解: B 解説: Lake Formation Governed Tables: Apache Icebergをベースにした管理テーブル形式。ACID: 複数オペレーションのアトミックコミット。タイムトラベル: SELECT … FOR SYSTEM_TIME AS OF。自動コンパクション: 小ファイルの自動結合。行/列レベルセキュリティとの統合。Storage Optimized Queries(SOQ)による最適化。Athena、EMR、Redshift Spectrumから利用可能。


問題3 Amazon Athenaのクエリコスト最適化のために「パーティション投影(Partition Projection)」を使用する理由は?

A) クエリのメモリ使用量を削減するため B) Glue Data CatalogへのAPI呼び出しなしでパーティションを動的に計算し、大規模パーティション(数千〜数百万)でのクエリ速度とコストを改善 C) Athenaのスループットを増加させるため D) クエリ結果をキャッシュするため

正解: B 解説: Partition Projection: テーブルプロパティにパーティション定義(範囲、値リスト等)を記述することで、Glue Catalogへのメタデータ問い合わせなしに動的パーティション計算を実現。SHOW PARTITIONSが不要になり、新パーティションの自動認識も可能。特に日時パーティション(年/月/日)や大量のパーティションを持つテーブルで効果的。MSCKの実行が不要。


問題4 Apache Kafka(MSK)の「Exactly-once Delivery」を実現するための設定は?

A) at-most-once配信(acks=0)で十分 B) プロデューサーのべき等性(enable.idempotence=true)と Kafka Transactionsを組み合わせて使用 C) コンシューマーのauto.commit.enable=falseのみで実現 D) KafkaはExactly-onceをサポートしない

正解: B 解説: Kafka Exactly-once: ①プロデューサーべき等性: enable.idempotence=true→重複書き込み防止。②Kafkaトランザクション: transactional.id設定→initTransactionsbeginTransaction→produce→commitTransactionでAtomicなマルチパーティション書き込み。③コンシューマー: isolation.level=read_committedでコミット済みデータのみ読み取り。MSKでFlink/Spark Streamingを使用する場合にも重要。


問題5 Amazon QuickSight の SPICE(Super-fast, Parallel, In-memory Calculation Engine)の制限として正しいのは?

A) SPICEはクエリのたびにデータソースに接続する B) SPICEのキャパシティには上限があり(追加購入可能)、データの鮮度はSPICEの更新スケジュールに依存する C) SPICEはSQL以外のデータソースを処理できない D) SPICEを使用すると常にコストが増加する

正解: B 解説: SPICE制限: ①容量上限(デフォルト1GB/ユーザー→追加購入で拡張)②データ鮮度: SPICE更新スケジュール(最短15分)→リアルタイムでない③一部のデータ型・クエリは直接クエリモードのみ対応。SPICE利点: データソースへの負荷ゼロ、クエリ高速化。直接クエリモード: 常に最新データ、容量制限なし、データソース負荷あり。


問題6 AWS Glue Crawlerが複数のS3パスをクローリングする際に単一テーブルとして認識させる方法は?

A) すべてのパスを1つのバケットに移動する B) Crawlerの設定で「グループ化」オプションを有効化し、共通のスキーマを持つパスを単一テーブルとして扱う C) 各パスに個別のCrawlerを作成する D) Athenaで複数テーブルをVIEWで結合する

正解: B 解説: Glue Crawlerのグループ化(Table Grouping): CrawlerにS3ターゲットを追加する際に「グループ化を有効にする」オプションを設定。共通のスキーマ、パターン、ファイル形式を持つ複数S3パスを自動的に単一テーブルに統合。例: s3://bucket/year=2024/month=01/, s3://bucket/year=2024/month=02/→単一テーブルとして登録(パーティション列として認識)。


問題7 Amazon Kinesis Data Streams のオンデマンドモードとプロビジョニングモードの選択基準は?

A) オンデマンドは常にコストが高い B) オンデマンド: トラフィックが予測不能・急激に変動する場合(自動スケール)。プロビジョニング: トラフィックが予測可能・安定している場合(コスト最適) C) プロビジョニングモードはスケーリング機能なし D) オンデマンドモードはシャード分割が不要

正解: B 解説: Kinesis Data Streams モード: オンデマンド(2021年追加): シャード管理不要、最大200MBps/200Mレコード書き込み/読み取り自動スケール、4GBデータ保持。コスト: PUT Payload Unit + データストレージ(シャード数に比例しない)。プロビジョニング: シャード手動管理(1シャード=1MB書き込み/2MB読み取り)、コストはシャード時間に比例。安定した低負荷ではプロビジョニングの方が安価。


問題8 AWS Step Functionsと Amazon MWAAのワークフロー管理の主な違いは?

A) MWAAはAWSサービスとの統合が不可能 B) Step Functions: サーバーレスで秒単位のイベント駆動ワークフロー向け(AWS SDKと直接統合)。MWAA(Managed Workflows for Apache Airflow): Python DAGで複雑なデータパイプラインのスケジュールとオーケストレーション向け C) MWAAは機械学習のみ対応 D) Step Functionsはスケジュール実行不可能

正解: B 解説: Step Functions vs MWAA(Apache Airflow): Step Functions: AWSネイティブ、JSON状態定義、低レイテンシーイベント駆動、SDK統合(300+サービス)、サーバーレス。MWAA: Python DAG、豊富なオペレーター(Spark、Hadoop、dbt等)、複雑なデータエンジニアリングパイプライン、スケジュール管理に優れる。大規模データパイプラインはMWAA、AWSサービス統合ワークフローはStep Functions。


問題9 Amazon EMRでのコスト最適化のベストプラクティスは?

A) 常に最大インスタンスタイプを使用する B) コアノードにオンデマンド(安定性確保)、タスクノードにSpotインスタンス(コスト削減)を組み合わせ、EMR Serverlessでアイドルコスト削減 C) EMRはコスト最適化不可能 D) 単一インスタンスタイプを使用する

正解: B 解説: EMR コスト最適化: ①コアノード(HDFS保存が必要): オンデマンドで安定性確保②タスクノード(処理のみ): Spot Instanceで60-90%コスト削減③Instance Fleet: 複数インスタンスタイプをSpot + ODで混在させて容量確保④EMR Serverless: クラスター管理不要、ジョブ実行時のみ課金(アイドルコストゼロ)⑤S3をHDFSの代わりに使用: クラスター停止間でのデータ永続化。


問題10 AWS Glue Visual ETLとGlue Studioの関係は?

A) 別々のサービス B) AWS Glue StudioはGlue Visual ETLのコンソールUIで、ビジュアルエディタでGlue ETLジョブを設計・デバッグ・監視するツール C) Glue Visual ETLはコードのみ D) Glue StudioはSageMaker専用

正解: B 解説: Glue Studio(Visual ETL): Glue ETLジョブをノードベースのビジュアルインターフェースで設計。ソース(S3、JDBC、Kinesis)→変換(フィルター、Join、集計、スキーマ変更)→ターゲット(S3、Redshift、JDBC)をドラッグ&ドロップで接続。自動でPySpark/Pythonコードを生成。カスタムコードの追加も可能。Glue Data Quality統合でDQDLルールを視覚的に設定可能。


問題11 Apache Hudi と Apache Icebergの主な違いは?

A) 完全に同一の機能を提供する B) Hudi: upsert/delete最適化(Copy-on-Write vs Merge-on-Read)、インクリメンタル処理。Iceberg: 豊富なスキーマ進化、クロスエンジン互換性(Spark/Trino/Hive等)、大規模パーティション管理 C) IcebergはS3でのみ動作する D) HudiはSparkからのみアクセス可能

正解: B 解説: Hudi: Record-level更新に最適化(CDC処理)。Copy-on-Write(バッチ向け)とMerge-on-Read(低レイテンシー更新)の2モード。Hudi Timeline(変更履歴)。EMRFS/DeltaStreamer統合。Iceberg: 隠しパーティション(クエリ書き換え不要)、スキーマ進化(列追加/削除/リネーム)、大規模テーブル(数百億ファイル)対応、エンジン間の互換性が高い。Delta Lake: Databricks主導、Z-Ordering、AUTO OPTIMIZE。


問題12 Amazon Kinesis Data Firehoseが「バッファリング」する理由は?

A) コスト削減のため B) 小さなバッチをまとめて書き込み先(S3/Redshift/OpenSearch)への書き込み効率を最大化するため(バッファサイズまたはバッファ時間のいずれか早い方でフラッシュ) C) Kinesis Data Streamsのシャードを削減するため D) データを暗号化するため

正解: B 解説: Firehoseバッファリング設定: バッファサイズ: 1MB〜128MB(S3の場合)。バッファ時間: 60秒〜900秒。条件: どちらか先に満たした方でフラッシュ(小さい方が優先)。例: 128MB/300秒設定で、5分経過または128MB蓄積で書き込み。S3のPutObject呼び出し回数を削減し、S3マルチパートアップロードへの移行も少なくなる(コスト最適化)。


問題13 AWS Database Migration Service(DMS)の「Change Data Capture(CDC)」使用時のソース要件は?

A) ソースDBがAWS RDSのみ B) MySQLはbinlog_format=ROW、PostgreSQLはwal_level=logical、OracleはSupplemental Loggingの有効化が必要 C) CDCはターゲットがDynamoDBの場合のみ動作 D) CDCはフルロードの後に自動的に有効化される

正解: B 解説: DMS CDC前提条件: MySQL/Aurora MySQL: binlog_format=ROWbinlog_retention_hours >= 24。PostgreSQL: wal_level=logical、レプリカIDの設定。Oracle: Supplemental Logging有効化(ALTER TABLE ... ADD SUPPLEMENTAL LOG DATA)。SQL Server: MS-CDCまたはMS-Replicationの有効化。オンプレミスDBでも要件を満たせば対応可能。


問題14 Amazon Redshiftのコンカレンシースケーリング(Concurrency Scaling)の仕組みは?

A) 既存クラスターに追加ノードを追加する B) 同時クエリ数が増加した場合にRedhsiftが追加のクラスター容量を自動プロビジョニングし、クエリをルーティング。コンカレンシースケーリングの時間が1日最大1時間まで無料 C) Redshiftは自動スケーリングをサポートしない D) コンカレンシースケーリングはメインクラスターと別のデータを使用する

正解: B 解説: Concurrency Scaling: WLM(Workload Management)でキューが混雑した場合に追加のRedhsiftクラスターを自動起動(秒単位)してクエリをルーティング。同じデータ(S3 Managed Storage / RA3 node)を参照するため一貫性あり。コスト: アクティブな時間分のRedshiftクレジット(1日1時間分は無料クレジットが自動付与)。主にBIダッシュボードの同時ユーザーが急増する場合に有効。


問題15 Amazon Athena Federated Queryを使用してRDSにあるデータとS3のデータレイクを結合する場合の必要なコンポーネントは?

A) AthenaとRDSを直接接続できる B) RDS用のAthenaデータソースコネクター(Lambda関数)をデプロイし、AthenaがそのLambdaを経由してRDSにクエリ C) RDSのデータをS3にコピーしてからAthenaでクエリ D) AWS Glue ETLでデータを変換してからAthenaでクエリ

正解: B 解説: Athena Federated Query: データソースコネクター(Lambda関数)を使用してS3以外のデータソースをクエリ。コネクタータイプ: Amazon RDS/Aurora(MySQL、PostgreSQL)、DynamoDB、CloudWatch Logs、Elasticsearch等。AWSサーバーレスアプリケーションリポジトリからコネクターをデプロイ→データソースカタログに登録→標準SQLでS3データとJOIN可能。コネクターのLambdaは推論処理のみ(データ変換なし)。


DAS試験 最終チェックリスト(補足)

重要な比較と使い分け

  • [ ] Kinesis Data Streams vs MSK vs Firehose(ユースケース別選択)
  • [ ] EMR vs Athena vs Redshift Spectrum(コスト・パフォーマンス比較)
  • [ ] Glue ETL vs Glue DataBrew(ユーザー向け vs 開発者向け)
  • [ ] DMS vs DataSync vs Snowball(データ転送手法)
  • [ ] Iceberg vs Hudi vs Delta Lake(オープンテーブル形式比較)
  • [ ] QuickSight SPICE vs 直接クエリ(鮮度 vs パフォーマンス)
  • [ ] OpenSearch Hot vs UltraWarm vs Cold(コスト vs レイテンシー)

試験頻出トピック

  1. Kinesis シャード数計算(入力レート÷1MB/s、出力レート÷2MB/s)
  2. Firehoseのバッファリング設定(サイズ OR 時間)
  3. Glue DPU計算(処理データ量 ÷ DPUあたり処理能力)
  4. Athena コスト(スキャンデータ量 $5/TB、Parquet+圧縮で削減)
  5. Redshift DISTKEY/SORTKEYの選択基準