目次

Amazon Timestream 完全ガイド v2.0

初心者から実務者向けの包括的解説

Amazon Timestream は、IoT・運用メトリクス・アプリケーションテレメトリ向けのフルマネージド時系列データベースです。毎日数兆の時系列データポイントを保存・分析でき、時系列データに特化したクエリエンジン、自動階層化ストレージ、ネイティブな時系列分析関数を提供します。本ドキュメントは、Timestream の概念・アーキテクチャ・設計パターン・エコシステム・2025-2026 の最新動向を体系的に解説する包括的ガイドです。

ドキュメントの目的

本ガイドは以下を対象としています。

  • 初心者向け: Timestream とは何か、RDS / DynamoDB との違いを学びたい方
  • 開発者向け: メモリストア・マグネティックストア・Scheduled Query の実装
  • データアーキテクト向け: Multi-measure record・時系列クエリの最適化
  • SRE / インフラ向け: スケーリング・VPC エンドポイント・ディザスタリカバリ
  • 意思決定者向け: InfluxDB / TimescaleDB / Prometheus との比較・投資判断

2026 年の Timestream エコシステム

  • Timestream for InfluxDB: マネージド InfluxDB v2 互換、既存 InfluxDB ワークロード移行対応
  • Timestream for LiveAnalytics: メモリストア + マグネティックストアの自動階層化
  • Schedule Query 拡張: より複雑な定期実行分析・自動集計
  • Vector Search(プレビュー): ベクトル埋め込みの直接保存・検索
  • PartiQL サポート: SQL ライク構文での時系列クエリ
  • Grafana Deep Integration: リアルタイムダッシュボード・Alert 統合
  • AWS Glue との統合: Zero-ETL で Redshift と時系列データを同期

定義

AWS 公式による定義:

“Amazon Timestream is a fast, scalable, fully managed time series database designed for IoT, industrial telemetry, analytics, and monitoring applications.”

特徴:

  • 時系列特化: 時系列データに最適化されたクエリエンジン
  • 自動階層化: メモリストア → マグネティックストア の自動移行
  • スケール: 毎秒数百万レコード、ペタバイト規模対応
  • マネージド: インフラ管理不要、99.99% SLA

目次

  1. 概要
  2. Timestream が解決する課題
  3. 主な特徴
  4. Timestream for LiveAnalytics vs Timestream for InfluxDB
  5. アーキテクチャ
  6. データモデル
  7. ストレージ階層化
  8. クエリ言語と時系列関数
  9. Scheduled Queries
  10. マルチメジャーレコード
  11. 主要ユースケース
  12. 設定・操作の具体例
  13. CLI 操作
  14. SDK 実装例
  15. IaC (CloudFormation/Terraform)
  16. 類似サービス比較表
  17. ベストプラクティス
  18. トラブルシューティング
  19. セキュリティ・コンプライアンス
  20. パフォーマンスチューニング
  21. コスト最適化
  22. 2025-2026 最新動向
  23. 学習リソース・参考文献
  24. 実装例・チェックリスト
  25. まとめ

概要

初心者向けメモ: Timestream は「時系列データに特化した高速・低コスト DB」です。RDS は「複雑な JOIN が必要なリレーショナルデータ」、DynamoDB は「汎用 NoSQL」に向いていますが、Timestream は「タイムスタンプが主軸のデータ」(IoT センサー・メトリクス・ログ)に最適化されています。

Timestream は以下を実現します:

  • 毎秒数百万レコードのスケール: IoT デバイスから大量の時系列データを高速に取り込み
  • 自動ストレージ階層化: メモリストア(最新データ)+ マグネティックストア(中期) + S3(長期)を自動管理してコスト最適化
  • 時系列特有の分析関数: 補間・ウィンドウ集計・季節性分解をネイティブサポート
  • SQL 互換クエリ: 学習コスト低く、既存 SQL スキルで即利用可能
  • Grafana 統合: リアルタイムダッシュボード構築が標準的

Timestream が解決する課題

1. 時系列データの大規模保存と高速クエリの両立が困難

課題: RDS で毎秒 10,000 レコードの時系列データを保存・クエリしようとすると、インデックス設計・テーブルパーティショニング・古いデータの削除・アーカイブ管理が複雑かつ手動

Timestream の解決:

毎秒数百万レコード → メモリストアに自動保存
  ↓ (ユーザー定義ポリシーで自動移行)
自動的にマグネティックストアに階層化
  → クエリはメモリ + マグネティックストアに透過的にアクセス
  → 古いデータは低コスト保存

2. 時系列クエリの実装が複雑

課題: 「過去 1 時間の 5 分ごとの平均値」「欠損値を線形補間で埋める」といった時系列特有の処理を RDS で実装すると複雑

Timestream の解決: 時系列関数をネイティブサポート

SELECT deviceId,
       BIN(time, 5m) AS window,
       AVG(temperature) AS avg_temp
FROM "iot"."sensor_data"
WHERE time BETWEEN ago(1h) AND now()
GROUP BY deviceId, BIN(time, 5m)

3. インフラ管理の負担

課題: Prometheus・InfluxDB・TimescaleDB をオンプレミス・EC2 で運用すると、HA・バックアップ・スケーリング・パッチ管理が必要

Timestream の解決: フルマネージド・サーバーレス・99.99% SLA


主な特徴

特徴 説明
時系列最適化 クエリエンジン・ストレージ・関数全てが時系列に最適化
自動階層化 メモリ → マグネティック → S3 を自動移行、コスト最適化
毎秒数百万レコード IoT・メトリクス規模のスケール対応
マルチメジャー 1 レコードに複数メトリクス(CPU, Memory, Disk)を格納
時系列関数 補間・ウィンドウ集計・季節分解をネイティブサポート
SQL 互換 既存 SQL スキルで利用可能
Scheduled Query 定期実行クエリで事前集計・コスト削減
Grafana / QuickSight ダッシュボード統合が標準
VPC エンドポイント プライベート接続対応
CloudTrail 監査ログ完全サポート

Timestream for LiveAnalytics vs Timestream for InfluxDB

観点 LiveAnalytics for InfluxDB
エンジン AWS 独自 SQL 互換 InfluxDB v2 互換
クエリ言語 SQL + 時系列関数 Flux / InfluxQL
メモリストア AWS 管理(MB~GB) InfluxDB メモリ
マグネティックストア S3 ベース(低コスト) オンプレミス or クラウド
既存InfluxDB移行 スキーマ変換必要 互換(接続するだけ)
学習曲線 SQL 標準(低) Flux/InfluxQL 習得必要
推奨用途 汎用時系列・IoT・メトリクス InfluxDB 既存ユーザー
コスト 書き込み量・クエリスキャン インスタンス時間

選択基準:

  • InfluxDB 既存ユーザー → Timestream for InfluxDB
  • 新規・SQL 習熟 → Timestream for LiveAnalytics

アーキテクチャ

graph TB
    IoT["IoT Core / Kinesis Data Streams / MSK"]
    TS["Amazon Timestream"]
    MS["メモリストア (InMemory)"]
    MGS["マグネティックストア (S3)"]
    
    Grafana["Grafana / QuickSight / Analytics"]
    Query["Query Engine"]
    
    IoT -->|Write API| TS
    TS -->|Auto Tiering| MS
    MS -->|Auto Migration Policy| MGS
    
    Query -->|Transparent Access| MS
    Query -->|Auto Redirect| MGS
    Query -->|Results| Grafana
    
    Lambda["Lambda Code Hook"]
    Scheduled["Scheduled Query"]
    
    TS -->|Trigger| Lambda
    TS -->|Pre-aggregation| Scheduled
    
    SNS["Amazon SNS / CloudWatch"]
    Scheduled -->|Alert| SNS

データモデル

テーブル・ディメンション・メジャー

Database: my_app
  └── Table: server_metrics
      ├── ディメンション(タグ・メタデータ)
      │   ├── region: us-east-1 (変化しない)
      │   ├── instanceId: i-abc123
      │   └── service: web-api
      │
      ├── メジャー(計測値)
      │   ├── cpu_utilization: 75.3
      │   ├── memory_used_gb: 12.8
      │   └── disk_read_iops: 350
      │
      └── タイムスタンプ: 2026-04-27T10:30:00Z

マルチメジャーレコード(推奨)

1 回の書き込みで複数メトリクスを格納:

{
  "DatabaseName": "my_app",
  "TableName": "server_metrics",
  "Records": [
    {
      "Time": "2026-04-27T10:30:00Z",
      "TimeUnit": "SECONDS",
      "Dimensions": [
        {"Name": "region", "Value": "us-east-1"},
        {"Name": "instanceId", "Value": "i-abc123"}
      ],
      "MeasureName": "server_stats",
      "MeasureValues": [
        {"Name": "cpu_utilization", "Value": "75.3", "Type": "DOUBLE"},
        {"Name": "memory_used_gb", "Value": "12.8", "Type": "DOUBLE"},
        {"Name": "disk_read_iops", "Value": "350", "Type": "BIGINT"}
      ]
    }
  ]
}

メリット:

  • 書き込み 1 回で複数メトリクス
  • ストレージ効率向上
  • 同一タイムスタンプの一貫性

ストレージ階層化

データ書き込み(Write API)
    ↓
メモリストア(InMemory)
    ← 最近 1 時間〜数時間
    ← 高速クエリ向け
    ← 高コスト(GB/時)
    
    ↓ (ユーザー定義ポリシー自動適用)
    
マグネティックストア(S3)
    ← 数日〜年単位
    ← 中コスト(GB/月)
    ← シーケンシャルアクセス最適化
    
    ↓ (ライフサイクルポリシー)
    
S3 深いアーカイブ / Glacier
    ← 年単位の長期保存
    ← 最低コスト(GB/年)
ストア 用途 コスト レイテンシ
メモリストア 直近データの超高速クエリ < 1ms
マグネティックストア 中期・分析クエリ 数秒
S3 アーカイブ 長期コンプライアンス保存 分〜時

クエリ言語と時系列関数

基本クエリ

-- 直近 1 時間の平均 CPU 使用率(5 分ごと)
SELECT instanceId,
       BIN(time, 5m) AS window,
       AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(1h) AND now()
  AND region = 'us-east-1'
GROUP BY instanceId, BIN(time, 5m)
ORDER BY window DESC;

-- 過去 7 日の日単位集計
SELECT DATE_TRUNC('day', time) AS day,
       region,
       MAX(CAST(cpu_utilization AS DOUBLE)) AS max_cpu,
       AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu,
       MIN(CAST(cpu_utilization AS DOUBLE)) AS min_cpu
FROM "my_app"."server_metrics"
WHERE time > ago(7d)
GROUP BY DATE_TRUNC('day', time), region
ORDER BY day DESC;

時系列専用関数

関数 機能
BIN(time, interval) 時間をバケット化(5 分ごと等)
FILL(value) NULL 値を指定値で埋める
INTERPOLATE_LINEAR() 線形補間で欠損値補完
INTERPOLATE_LOCF() 直前値で補完(Last Observation Carried Forward)
ago(duration) 現在から過去の時間を計算
CREATE_TIME_SERIES() 時系列オブジェクト作成
SMOOTH() スムージング(移動平均等)
APPROX_PERCENTILE_CONT() 近似パーセンタイル

補間・平滑化例

-- 線形補間で欠損値を埋める
SELECT instanceId,
       time,
       INTERPOLATE_LINEAR(
         CREATE_TIME_SERIES(time, cpu_utilization),
         SEQUENCE(min(time) OVER (), max(time) OVER (), 1m)
       ) AS interpolated_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(24h) AND now()
GROUP BY instanceId;

-- 移動平均(5 分ウィンドウ)
SELECT instanceId,
       time,
       AVG(CAST(cpu_utilization AS DOUBLE)) OVER (
         PARTITION BY instanceId
         ORDER BY time
         ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
       ) AS moving_avg_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(12h) AND now()
ORDER BY time DESC;

Scheduled Queries

定期実行クエリで事前集計・ダッシュボード用プリアグリゲーション:

-- 定義:毎 5 分ごとに 5 分単位の平均値を別テーブルに書き込み
CREATE SCHEDULED_QUERY 'compute_5min_agg' AS
SELECT 
  BIN(time, 5m) AS binned_time,
  region,
  instanceId,
  AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu,
  MAX(CAST(cpu_utilization AS DOUBLE)) AS max_cpu,
  COUNT(*) AS sample_count
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(1h) AND now()
GROUP BY BIN(time, 5m), region, instanceId
EXEC INTO "my_app"."server_metrics_5min_agg"
SCHEDULE 'rate(5 minutes)';

メリット:

  • ダッシュボード表示の高速化(事前計算済み)
  • クエリコスト削減(大規模テーブルをスキャンしない)
  • 自動アラート・通知(SNS 統合)

マルチメジャーレコード

シングルメジャー(非効率)

# 1 回の書き込みで 1 メトリクスのみ
for metric in ['cpu', 'memory', 'disk']:
    timestream.write_records(
        Records=[{
            'Time': str(int(time.time() * 1000)),
            'TimeUnit': 'MILLISECONDS',
            'Dimensions': [{'Name': 'instance_id', 'Value': 'i-123'}],
            'MeasureName': metric,
            'MeasureValue': '75.3',
            'MeasureValueType': 'DOUBLE'
        }]
    )  # 3 回の API コール

マルチメジャー(推奨)

# 1 回の書き込みで複数メトリクス
timestream.write_records(
    Records=[{
        'Time': str(int(time.time() * 1000)),
        'TimeUnit': 'MILLISECONDS',
        'Dimensions': [{'Name': 'instance_id', 'Value': 'i-123'}],
        'MeasureName': 'server_metrics',
        'MeasureValues': [
            {'Name': 'cpu_utilization', 'Value': '75.3', 'Type': 'DOUBLE'},
            {'Name': 'memory_used_gb', 'Value': '12.8', 'Type': 'DOUBLE'},
            {'Name': 'disk_read_iops', 'Value': '350', 'Type': 'BIGINT'}
        ]
    }]
)  # 1 回の API コール、コスト削減

主要ユースケース

  1. IoT センサーモニタリング

    • 工場の 10,000 台のセンサーから毎秒 100 万データポイント取得
    • Grafana で異常検知ダッシュボード表示
    • Scheduled Query で 1 時間ごとの集計・SNS 通知
  2. アプリケーション メトリクス

    • EC2・Lambda のレイテンシー・エラー率・スループット
    • 13 ヶ月保持してキャパシティプランニング
    • QuickSight で BI ダッシュボード
  3. 金融市場データ

    • 株価・為替レート・先物の時系列保存
    • 1 分・5 分・1 時間の OHLC(始値・高値・安値・終値)を自動計算
    • Scheduled Query で価格アラート
  4. スマートメーター

    • 電力会社が数百万世帯の電力消費データを 7 年間保持
    • 機械学習で需要予測・コスト最適化
    • SageMaker と連携
  5. ネットワーク・セキュリティ

    • VPC Flow Logs・WAF ログ・GuardDuty アラート
    • 時系列で侵入検知・トレンド分析
  6. 医療・IoT デバイス

    • ウェアラブルから心拍・酸素飽和度を毎秒取得
    • HIPAA 準拠で長期保存・研究利用
  7. SaaS メトリクス

    • API レスポンスタイム・アクティブユーザー数・課金メトリクス
    • オペレーション・ダッシュボード
  8. 気象・環境モニタリング

    • 気象ステーション・大気品質・水質データ
    • 数十年の歴史データで気候変動分析
  9. ゲーム分析

    • プレイヤーの時系列アクティビティ
    • リアルタイムランキング・ミッション進捗
  10. 製造業 - 予防保全

    • 機械の振動・温度・圧力データ
    • 異常検知で故障予測・ダウンタイム削減

設定・操作の具体例

AWS Management Console での作成

  1. Timestream コンソール → Create Database

    Database name: my_iot_app
    KMS Key: (AWS managed or Customer managed)
    
  2. Create Table

    Table name: sensor_data
    Memory store retention: 1 hour
    Magnetic store retention: 365 days (1 year)
    Magnetic store S3: s3://my-bucket/timestream/
    
  3. Table Schema

    Dimensions: region, instance_id, sensor_type
    Time attribute: time (TIMESTAMP)
    Retention: Memory 1h, Magnetic 365d
    

CLI 操作

データベース・テーブル作成

# Database 作成
aws timestream-write create-database \
  --database-name my_iot_app \
  --region us-east-1

# Table 作成
aws timestream-write create-table \
  --database-name my_iot_app \
  --table-name sensor_data \
  --retention-properties \
    MemoryStoreRetentionPeriodInHours=24,\
    MagneticStoreRetentionPeriodInDays=365

データ書き込み

# JSON ファイルから書き込み
aws timestream-write write-records \
  --database-name my_iot_app \
  --table-name sensor_data \
  --records '[
    {
      "Time": "1667303200000",
      "TimeUnit": "MILLISECONDS",
      "Dimensions": [
        {"Name": "region", "Value": "us-east-1"},
        {"Name": "instance_id", "Value": "i-abc123"}
      ],
      "MeasureName": "cpu_utilization",
      "MeasureValue": "75.3",
      "MeasureValueType": "DOUBLE"
    }
  ]'

クエリ実行

# クエリ実行
aws timestream-query query \
  --query-string "SELECT * FROM \"my_iot_app\".\"sensor_data\" WHERE time > ago(1h) LIMIT 10"

# 結果取得
aws timestream-query get-query-execution-status \
  --query-id <query-id>

Scheduled Query 作成

aws timestream-write create-scheduled-query \
  --name daily-aggregation \
  --schedule-expression "rate(1 day)" \
  --query-string "SELECT \
    DATE_TRUNC('day', time) as day, \
    region, \
    AVG(CAST(cpu_utilization AS DOUBLE)) as avg_cpu \
    FROM \"my_iot_app\".\"sensor_data\" \
    WHERE time > ago(1d) \
    GROUP BY DATE_TRUNC('day', time), region" \
  --scheduled-query-execution-role-arn arn:aws:iam::123456789012:role/TimestreamRole \
  --target-configuration '{"TimestreamConfiguration": {"DatabaseName": "my_iot_app", "TableName": "sensor_data_daily"}}'

SDK 実装例

Python (Boto3)

import boto3
import json
from datetime import datetime
import time

client = boto3.client('timestream-write', region_name='us-east-1')
query_client = boto3.client('timestream-query', region_name='us-east-1')

# データ書き込み(マルチメジャー)
def write_sensor_data(device_id, temperature, humidity, pressure):
    current_time = str(int(time.time() * 1000))
    
    records = [{
        'Time': current_time,
        'TimeUnit': 'MILLISECONDS',
        'Dimensions': [
            {'Name': 'region', 'Value': 'us-east-1'},
            {'Name': 'device_id', 'Value': device_id},
            {'Name': 'sensor_type', 'Value': 'environmental'}
        ],
        'MeasureName': 'environmental_metrics',
        'MeasureValues': [
            {'Name': 'temperature_celsius', 'Value': str(temperature), 'Type': 'DOUBLE'},
            {'Name': 'humidity_percent', 'Value': str(humidity), 'Type': 'DOUBLE'},
            {'Name': 'pressure_mbar', 'Value': str(pressure), 'Type': 'DOUBLE'}
        ]
    }]
    
    response = client.write_records(
        DatabaseName='my_iot_app',
        TableName='sensor_data',
        Records=records
    )
    print(f"Wrote {response['RecordsIngested']['Total']} records")
    return response

# クエリ実行
def query_sensor_stats(hours=1):
    query_string = f"""
    SELECT 
        device_id,
        BIN(time, 5m) as window,
        AVG(CAST(temperature_celsius AS DOUBLE)) as avg_temp,
        MAX(CAST(temperature_celsius AS DOUBLE)) as max_temp,
        MIN(CAST(temperature_celsius AS DOUBLE)) as min_temp,
        COUNT(*) as sample_count
    FROM "my_iot_app"."sensor_data"
    WHERE time > ago({hours}h)
    GROUP BY device_id, BIN(time, 5m)
    ORDER BY window DESC
    """
    
    response = query_client.query(QueryString=query_string)
    
    columns = [col['Name'] for col in response['ColumnInfo']]
    print(f"Columns: {columns}")
    
    for row in response['Rows']:
        print([cell.get('ScalarValue') for cell in row['Data']])
    
    return response

# 使用例
if __name__ == '__main__':
    # データ書き込み
    write_sensor_data('sensor-001', temperature=23.5, humidity=45.2, pressure=1013.25)
    write_sensor_data('sensor-002', temperature=22.8, humidity=48.1, pressure=1013.30)
    
    # クエリ実行
    query_sensor_stats(hours=1)

Java

import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient;
import software.amazon.awssdk.services.timestreamwrite.model.*;
import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient;
import software.amazon.awssdk.services.timestreamquery.model.*;

public class TimestreamExample {
    public static void writeRecords() {
        TimestreamWriteClient client = TimestreamWriteClient.builder().build();
        
        long time = System.currentTimeMillis();
        String timeStr = Long.toString(time);
        
        Record record = Record.builder()
            .time(timeStr)
            .timeUnit(TimeUnit.MILLISECONDS)
            .dimensions(
                Dimension.builder().name("region").value("us-east-1").build(),
                Dimension.builder().name("instance_id").value("i-abc123").build()
            )
            .measureName("server_metrics")
            .measureValues(
                MeasureValue.builder().name("cpu_utilization").value("75.3").type(MeasureValueType.DOUBLE).build(),
                MeasureValue.builder().name("memory_used_gb").value("12.8").type(MeasureValueType.DOUBLE).build()
            )
            .build();
        
        WriteRecordsRequest writeRequest = WriteRecordsRequest.builder()
            .databaseName("my_iot_app")
            .tableName("sensor_data")
            .records(record)
            .build();
        
        WriteRecordsResponse response = client.writeRecords(writeRequest);
        System.out.println("Records written: " + response.recordsIngested().total());
        
        client.close();
    }
    
    public static void queryRecords() {
        TimestreamQueryClient queryClient = TimestreamQueryClient.builder().build();
        
        String queryString = "SELECT * FROM \"my_iot_app\".\"sensor_data\" WHERE time > ago(1h) LIMIT 10";
        
        QueryRequest queryRequest = QueryRequest.builder()
            .queryString(queryString)
            .build();
        
        QueryResponse response = queryClient.query(queryRequest);
        
        response.rows().forEach(row -> {
            row.data().forEach(datum -> System.out.print(datum.scalarValue() + " "));
            System.out.println();
        });
        
        queryClient.close();
    }
}

Node.js

const AWS = require('aws-sdk');

const writeClient = new AWS.TimestreamWrite({ region: 'us-east-1' });
const queryClient = new AWS.TimestreamQuery({ region: 'us-east-1' });

// データ書き込み
async function writeSensorData(deviceId, temperature, humidity) {
    const time = Date.now().toString();
    
    const params = {
        DatabaseName: 'my_iot_app',
        TableName: 'sensor_data',
        Records: [{
            Time: time,
            TimeUnit: 'MILLISECONDS',
            Dimensions: [
                { Name: 'region', Value: 'us-east-1' },
                { Name: 'device_id', Value: deviceId }
            ],
            MeasureName: 'environment_metrics',
            MeasureValues: [
                { Name: 'temperature', Value: temperature.toString(), Type: 'DOUBLE' },
                { Name: 'humidity', Value: humidity.toString(), Type: 'DOUBLE' }
            ]
        }]
    };
    
    try {
        const response = await writeClient.writeRecords(params).promise();
        console.log('Records written:', response.RecordsIngested.Total);
    } catch (error) {
        console.error('Error writing records:', error);
    }
}

// クエリ実行
async function querySensorData() {
    const params = {
        QueryString: `
            SELECT device_id, 
                   BIN(time, 5m) as window,
                   AVG(CAST(temperature AS DOUBLE)) as avg_temp
            FROM "my_iot_app"."sensor_data"
            WHERE time > ago(1h)
            GROUP BY device_id, BIN(time, 5m)
        `
    };
    
    try {
        const response = await queryClient.query(params).promise();
        console.log('Query result:', response.Rows);
    } catch (error) {
        console.error('Error querying:', error);
    }
}

// 実行
(async () => {
    await writeSensorData('sensor-001', 23.5, 45.2);
    await querySensorData();
})();

IaC (CloudFormation/Terraform)

CloudFormation

AWSTemplateFormatVersion: '2010-09-09'
Description: 'Timestream Database and Table'

Resources:
  TimestreamDatabase:
    Type: AWS::Timestream::Database
    Properties:
      DatabaseName: my_iot_app
      Tags:
        - Key: Environment
          Value: Production

  TimestreamTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref TimestreamDatabase
      TableName: sensor_data
      RetentionProperties:
        MemoryStoreRetentionPeriodInHours: 24
        MagneticStoreRetentionPeriodInDays: 365
      MagneticStoreWriteProperties:
        EnableMagneticStoreWrites: true
      Tags:
        - Key: Environment
          Value: Production

  TimestreamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: TimestreamAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - timestream:WriteRecords
                  - timestream:SelectValues
                Resource: !Sub 'arn:aws:timestream:${AWS::Region}:${AWS::AccountId}:database/${TimestreamDatabase}/table/${TimestreamTable}'

Outputs:
  DatabaseName:
    Value: !Ref TimestreamDatabase
  TableName:
    Value: !Ref TimestreamTable
  RoleArn:
    Value: !GetAtt TimestreamRole.Arn

Terraform

provider "aws" {
  region = "us-east-1"
}

resource "aws_timestreamwrite_database" "my_iot_app" {
  database_name = "my_iot_app"

  tags = {
    Environment = "Production"
  }
}

resource "aws_timestreamwrite_table" "sensor_data" {
  database_name = aws_timestreamwrite_database.my_iot_app.database_name
  table_name    = "sensor_data"

  retention_properties {
    memory_store_retention_period_in_hours    = 24
    magnetic_store_retention_period_in_days   = 365
  }

  magnetic_store_write_properties {
    enable_magnetic_store_writes = true
  }

  tags = {
    Environment = "Production"
  }
}

resource "aws_iam_role" "timestream_role" {
  name = "timestream-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy" "timestream_policy" {
  name = "timestream-policy"
  role = aws_iam_role.timestream_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Action = [
        "timestream:WriteRecords",
        "timestream:SelectValues"
      ]
      Resource = "arn:aws:timestream:us-east-1:*:database/${aws_timestreamwrite_database.my_iot_app.database_name}/table/${aws_timestreamwrite_table.sensor_data.table_name}"
    }]
  })
}

output "database_name" {
  value = aws_timestreamwrite_database.my_iot_app.database_name
}

output "table_name" {
  value = aws_timestreamwrite_table.sensor_data.table_name
}

類似サービス比較表

比較軸 Timestream InfluxDB(OSS) TimescaleDB Prometheus ClickHouse
マネージド時系列 DB OSS 時系列 DB PostgreSQL 拡張 メトリクス収集 OLAP 列指向
インフラ管理 AWS マネージド オンプレ / 自己管理 オンプレ / 自己管理 オンプレ / 自己管理 オンプレ / 自己管理
スケール 毎秒百万レコード 毎秒万レコード 毎秒十万レコード 数千〜万レコード 毎秒数百万イベント
クエリ言語 SQL 互換 InfluxQL / Flux SQL (PostgreSQL) PromQL SQL
ストレージ階層化 自動(メモリ→磁気) 手動 手動 メモリのみ S3 / HDFS 統合
学習曲線 低(SQL標準) 中(InfluxQL習得) 低(SQL拡張) 中(PromQL) 低(SQL標準)
関数 時系列専用(補間等) InfluxQL 固有 時系列 / ウィンドウ 集計関数 SQL 標準
推奨用途 IoT・クラウドネイティブ IoT・オンプレ クラウド関連データ Prometheus エコシステム 大規模分析
コスト 従量制(書き込み・クエリ) 初期 + 運用 初期 + 運用 初期 + 運用 初期 + 運用
HA/DR 99.99% SLA 構築必要 構築必要 構築必要 構築必要
サーバーレス × × × ×
Grafana 統合 ネイティブ ネイティブ 可能 ネイティブ 可能

ベストプラクティス

✅ 推奨パターン

  1. マルチメジャーレコード利用

    • 1 回の書き込みで複数メトリクス
    • 書き込みコスト削減・ストレージ効率向上
  2. 適切なディメンション設計

    ✅ region, instance_id, service, environment
    ❌ timestamp, user_id(高カーディナリティ)
    

    カーディナリティが高すぎるとメモリストア効率低下

  3. Scheduled Query でプリアグリゲーション

    • ダッシュボード用の事前集計
    • クエリスキャン量削減 → コスト削減
  4. VPC エンドポイント利用

    • プライベート接続で安全性向上
    • CloudTrail ログ有効化
  5. パーティション戦略

    -- ディメンションを活用したパーティショニング
    WHERE region = 'us-east-1' AND time > ago(7d)
    
  6. メモリストア / マグネティックストア の適切な分離

    • ホット(直近 1 時間)→ メモリ
    • ウォーム(最近 1 週間)→ マグネティック
    • コールド(過去 1 年以上)→ S3

❌ アンチパターン

  1. シングルメジャーレコード の乱用

    • 毎回 API コール
    • 書き込みコスト増加
  2. 高カーディナリティディメンション

    ❌ user_id, request_id(数百万種類)
    
  3. スケジュール化されたアグリゲーションなし

    • ダッシュボードが毎回フルスキャン
    • コスト増・レイテンシ増
  4. IAM ポリシーが過剰に広い

    "Action": "timestream:*""Action": ["timestream:WriteRecords", "timestream:SelectValues"]
    
  5. レテンション期間の不適切な設定

    • マグネティック無しで容量超過
    • S3 にアーカイブしない

トラブルシューティング

症状 原因 対策
ValidationException スキーマ・データ型不一致 MeasureValue が DOUBLE なのに STRING 送信 → 型確認
ResourceNotFoundException テーブル・DB が存在しない arn:aws:timestream: 確認、リージョン確認
ThrottlingException 書き込み / クエリレート超過 Scheduled Query で集計、バッチ処理活用
AccessDeniedException IAM ロール権限不足 Policy に WriteRecords / SelectValues 追加
Query timeout クエリが大きすぎる / スキャンが多い WHERE で時間範囲・ディメンション フィルター追加
Memory store full 書き込み量が多く、ポリシー遅延 マグネティックストア移行時間確認
レスポンスが遅い メモリストア外のデータをスキャン メモリストア保持期間を延長

セキュリティ・コンプライアンス

内容
IAM リソース単位のきめ細かい権限制御
KMS 保存時暗号化(AWS Managed Key / CMK 選択可)
TLS / HTTPS 転送中の暗号化(デフォルト)
VPC Endpoint PrivateLink 対応、インターネット露出なし
CloudTrail API 操作の監査ログ
Retention コンプライアンス保持期間設定・自動削除可能
監査 CloudWatch Logs に詳細ログ出力

パフォーマンスチューニング

書き込みの最適化

# バッチ書き込み(複数レコード)
def batch_write_records(records, batch_size=100):
    for i in range(0, len(records), batch_size):
        batch = records[i:i+batch_size]
        client.write_records(
            DatabaseName='my_app',
            TableName='sensor_data',
            Records=batch
        )

クエリの最適化

-- ❌ 非効率:全テーブルスキャン
SELECT * FROM "my_app"."sensor_data"

-- ✅ 効率的:時間範囲フィルター
SELECT * FROM "my_app"."sensor_data"
WHERE time > ago(1h)

-- ✅ さらに効率的:ディメンション + 時間フィルター
SELECT * FROM "my_app"."sensor_data"
WHERE region = 'us-east-1'
  AND time > ago(1h)

-- ✅ 最高効率:Scheduled Query で事前集計
SELECT device_id, BIN(time, 5m), AVG(temperature)
FROM "my_app"."sensor_data_5min_agg"  -- 既に集計済み
WHERE time > ago(1h)

コスト最適化

1. マルチメジャーレコード利用

  • シングル × 3 メトリクス → マルチ × 1 レコード
  • 書き込みコスト 1/3

2. Scheduled Query でプリアグリゲーション

  • ダッシュボード毎回フルスキャン → 事前集計
  • クエリコスト 50-90% 削減

3. 適切なレテンション設定

  • メモリストア: 24 時間(ホットデータ)
  • マグネティック: 365 日(ウォームデータ)
  • → 古いデータは S3 ライフサイクルで移行

4. フィルターの活用

WHERE region = 'us-east-1' AND time > ago(7d)

不要なデータはスキャン前に除外


2025-2026 最新動向

Timestream for InfluxDB の進化

  • InfluxDB v3 互試対応(2026 年上半期予定)
  • パフォーマンス向上・マルチテナント対応

Vector Search サポート

  • 埋め込みベクトルの直接保存・検索
  • 時系列 + ベクトル検索の統合

Zero-ETL 統合

  • Redshift との自動同期
  • Glue なしでのデータレプリケーション

AI/ML 統合の深化

  • SageMaker Forecast との直接統合
  • Bedrock で異常検知・予測

マルチリージョン対応

  • クロスリージョン レプリケーション
  • グローバルアプリケーション対応

学習リソース・参考文献

公式リソース

ベンダー・OSS リソース

コミュニティ・ブログ

  • AWS Database Blog
  • Time Series Data Management
  • IoT on AWS Blog

実装例・チェックリスト

チェックリスト

  • [ ] データレート(毎秒レコード数)を測定
  • [ ] ディメンション・カーディナリティを設計
  • [ ] マルチメジャーレコード利用か確認
  • [ ] メモリストア / マグネティックストア 保持期間設定
  • [ ] Scheduled Query でプリアグリゲーション計画
  • [ ] VPC エンドポイント設定
  • [ ] IAM ロール・最小権限確認
  • [ ] CloudTrail 監査ログ有効化
  • [ ] Grafana / QuickSight 接続確認
  • [ ] コスト見積もり・予算アラート設定

実装チェック

# Timestream 接続確認
import boto3

client = boto3.client('timestream-write')

# DB リスト確認
response = client.list_databases()
print("Databases:", [db['DatabaseName'] for db in response['Databases']])

# テーブル確認
response = client.list_tables(DatabaseName='my_app')
print("Tables:", [t['TableName'] for t in response['Tables']])

# 書き込みテスト
import time
response = client.write_records(
    DatabaseName='my_app',
    TableName='sensor_data',
    Records=[{
        'Time': str(int(time.time() * 1000)),
        'TimeUnit': 'MILLISECONDS',
        'Dimensions': [{'Name': 'test', 'Value': 'true'}],
        'MeasureName': 'test_metric',
        'MeasureValue': '1.0',
        'MeasureValueType': 'DOUBLE'
    }]
)
print("Write success:", response['RecordsIngested']['Total'] > 0)

まとめ

Amazon Timestream は 「時系列データ専用の高速・低コスト・マネージド DB」 です。IoT センサーデータ・サーバーメトリクス・アプリテレメトリなど、タイムスタンプが主軸のデータに特化しており、以下の特徴を提供します:

  • 自動ストレージ階層化 でコスト最適化
  • SQL 互換クエリ で学習コスト低減
  • 時系列専用関数 で分析を高速化
  • Scheduled Query でダッシュボード最適化
  • Grafana 統合 でリアルタイムダッシュボード
  • 99.99% SLA でエンタープライズ品質

RDS / DynamoDB では対応困難な 毎秒百万レコード規模 のデータを、低コスト・低運用負荷で実現します。Timestream for LiveAnalytics(AWS ネイティブ)か Timestream for InfluxDB(InfluxDB 既存ユーザー向け)から選択できます。

最終更新:2026-04-26 バージョン:v2.0