目次
- 初心者から実務者向けの包括的解説
- 概要
- Timestream が解決する課題
- 主な特徴
- Timestream for LiveAnalytics vs Timestream for InfluxDB
- アーキテクチャ
- データモデル
- ストレージ階層化
- クエリ言語と時系列関数
- Scheduled Queries
- マルチメジャーレコード
- 主要ユースケース
- 設定・操作の具体例
- CLI 操作
- SDK 実装例
- IaC (CloudFormation/Terraform)
- 類似サービス比較表
- ベストプラクティス
- トラブルシューティング
- セキュリティ・コンプライアンス
- パフォーマンスチューニング
- コスト最適化
- 2025-2026 最新動向
- 学習リソース・参考文献
- 実装例・チェックリスト
- まとめ
Amazon Timestream 完全ガイド v2.0
初心者から実務者向けの包括的解説
Amazon Timestream は、IoT・運用メトリクス・アプリケーションテレメトリ向けのフルマネージド時系列データベースです。毎日数兆の時系列データポイントを保存・分析でき、時系列データに特化したクエリエンジン、自動階層化ストレージ、ネイティブな時系列分析関数を提供します。本ドキュメントは、Timestream の概念・アーキテクチャ・設計パターン・エコシステム・2025-2026 の最新動向を体系的に解説する包括的ガイドです。
ドキュメントの目的
本ガイドは以下を対象としています。
- 初心者向け: Timestream とは何か、RDS / DynamoDB との違いを学びたい方
- 開発者向け: メモリストア・マグネティックストア・Scheduled Query の実装
- データアーキテクト向け: Multi-measure record・時系列クエリの最適化
- SRE / インフラ向け: スケーリング・VPC エンドポイント・ディザスタリカバリ
- 意思決定者向け: InfluxDB / TimescaleDB / Prometheus との比較・投資判断
2026 年の Timestream エコシステム
- Timestream for InfluxDB: マネージド InfluxDB v2 互換、既存 InfluxDB ワークロード移行対応
- Timestream for LiveAnalytics: メモリストア + マグネティックストアの自動階層化
- Schedule Query 拡張: より複雑な定期実行分析・自動集計
- Vector Search(プレビュー): ベクトル埋め込みの直接保存・検索
- PartiQL サポート: SQL ライク構文での時系列クエリ
- Grafana Deep Integration: リアルタイムダッシュボード・Alert 統合
- AWS Glue との統合: Zero-ETL で Redshift と時系列データを同期
定義
AWS 公式による定義:
“Amazon Timestream is a fast, scalable, fully managed time series database designed for IoT, industrial telemetry, analytics, and monitoring applications.”
特徴:
- 時系列特化: 時系列データに最適化されたクエリエンジン
- 自動階層化: メモリストア → マグネティックストア の自動移行
- スケール: 毎秒数百万レコード、ペタバイト規模対応
- マネージド: インフラ管理不要、99.99% SLA
目次
- 概要
- Timestream が解決する課題
- 主な特徴
- Timestream for LiveAnalytics vs Timestream for InfluxDB
- アーキテクチャ
- データモデル
- ストレージ階層化
- クエリ言語と時系列関数
- Scheduled Queries
- マルチメジャーレコード
- 主要ユースケース
- 設定・操作の具体例
- CLI 操作
- SDK 実装例
- IaC (CloudFormation/Terraform)
- 類似サービス比較表
- ベストプラクティス
- トラブルシューティング
- セキュリティ・コンプライアンス
- パフォーマンスチューニング
- コスト最適化
- 2025-2026 最新動向
- 学習リソース・参考文献
- 実装例・チェックリスト
- まとめ
概要
初心者向けメモ: Timestream は「時系列データに特化した高速・低コスト DB」です。RDS は「複雑な JOIN が必要なリレーショナルデータ」、DynamoDB は「汎用 NoSQL」に向いていますが、Timestream は「タイムスタンプが主軸のデータ」(IoT センサー・メトリクス・ログ)に最適化されています。
Timestream は以下を実現します:
- 毎秒数百万レコードのスケール: IoT デバイスから大量の時系列データを高速に取り込み
- 自動ストレージ階層化: メモリストア(最新データ)+ マグネティックストア(中期) + S3(長期)を自動管理してコスト最適化
- 時系列特有の分析関数: 補間・ウィンドウ集計・季節性分解をネイティブサポート
- SQL 互換クエリ: 学習コスト低く、既存 SQL スキルで即利用可能
- Grafana 統合: リアルタイムダッシュボード構築が標準的
Timestream が解決する課題
1. 時系列データの大規模保存と高速クエリの両立が困難
課題: RDS で毎秒 10,000 レコードの時系列データを保存・クエリしようとすると、インデックス設計・テーブルパーティショニング・古いデータの削除・アーカイブ管理が複雑かつ手動
Timestream の解決:
毎秒数百万レコード → メモリストアに自動保存
↓ (ユーザー定義ポリシーで自動移行)
自動的にマグネティックストアに階層化
→ クエリはメモリ + マグネティックストアに透過的にアクセス
→ 古いデータは低コスト保存
2. 時系列クエリの実装が複雑
課題: 「過去 1 時間の 5 分ごとの平均値」「欠損値を線形補間で埋める」といった時系列特有の処理を RDS で実装すると複雑
Timestream の解決: 時系列関数をネイティブサポート
SELECT deviceId,
BIN(time, 5m) AS window,
AVG(temperature) AS avg_temp
FROM "iot"."sensor_data"
WHERE time BETWEEN ago(1h) AND now()
GROUP BY deviceId, BIN(time, 5m)
3. インフラ管理の負担
課題: Prometheus・InfluxDB・TimescaleDB をオンプレミス・EC2 で運用すると、HA・バックアップ・スケーリング・パッチ管理が必要
Timestream の解決: フルマネージド・サーバーレス・99.99% SLA
主な特徴
| 特徴 | 説明 |
|---|---|
| 時系列最適化 | クエリエンジン・ストレージ・関数全てが時系列に最適化 |
| 自動階層化 | メモリ → マグネティック → S3 を自動移行、コスト最適化 |
| 毎秒数百万レコード | IoT・メトリクス規模のスケール対応 |
| マルチメジャー | 1 レコードに複数メトリクス(CPU, Memory, Disk)を格納 |
| 時系列関数 | 補間・ウィンドウ集計・季節分解をネイティブサポート |
| SQL 互換 | 既存 SQL スキルで利用可能 |
| Scheduled Query | 定期実行クエリで事前集計・コスト削減 |
| Grafana / QuickSight | ダッシュボード統合が標準 |
| VPC エンドポイント | プライベート接続対応 |
| CloudTrail | 監査ログ完全サポート |
Timestream for LiveAnalytics vs Timestream for InfluxDB
| 観点 | LiveAnalytics | for InfluxDB |
|---|---|---|
| エンジン | AWS 独自 SQL 互換 | InfluxDB v2 互換 |
| クエリ言語 | SQL + 時系列関数 | Flux / InfluxQL |
| メモリストア | AWS 管理(MB~GB) | InfluxDB メモリ |
| マグネティックストア | S3 ベース(低コスト) | オンプレミス or クラウド |
| 既存InfluxDB移行 | スキーマ変換必要 | 互換(接続するだけ) |
| 学習曲線 | SQL 標準(低) | Flux/InfluxQL 習得必要 |
| 推奨用途 | 汎用時系列・IoT・メトリクス | InfluxDB 既存ユーザー |
| コスト | 書き込み量・クエリスキャン | インスタンス時間 |
選択基準:
- InfluxDB 既存ユーザー → Timestream for InfluxDB
- 新規・SQL 習熟 → Timestream for LiveAnalytics
アーキテクチャ
graph TB
IoT["IoT Core / Kinesis Data Streams / MSK"]
TS["Amazon Timestream"]
MS["メモリストア (InMemory)"]
MGS["マグネティックストア (S3)"]
Grafana["Grafana / QuickSight / Analytics"]
Query["Query Engine"]
IoT -->|Write API| TS
TS -->|Auto Tiering| MS
MS -->|Auto Migration Policy| MGS
Query -->|Transparent Access| MS
Query -->|Auto Redirect| MGS
Query -->|Results| Grafana
Lambda["Lambda Code Hook"]
Scheduled["Scheduled Query"]
TS -->|Trigger| Lambda
TS -->|Pre-aggregation| Scheduled
SNS["Amazon SNS / CloudWatch"]
Scheduled -->|Alert| SNS
データモデル
テーブル・ディメンション・メジャー
Database: my_app
└── Table: server_metrics
├── ディメンション(タグ・メタデータ)
│ ├── region: us-east-1 (変化しない)
│ ├── instanceId: i-abc123
│ └── service: web-api
│
├── メジャー(計測値)
│ ├── cpu_utilization: 75.3
│ ├── memory_used_gb: 12.8
│ └── disk_read_iops: 350
│
└── タイムスタンプ: 2026-04-27T10:30:00Z
マルチメジャーレコード(推奨)
1 回の書き込みで複数メトリクスを格納:
{
"DatabaseName": "my_app",
"TableName": "server_metrics",
"Records": [
{
"Time": "2026-04-27T10:30:00Z",
"TimeUnit": "SECONDS",
"Dimensions": [
{"Name": "region", "Value": "us-east-1"},
{"Name": "instanceId", "Value": "i-abc123"}
],
"MeasureName": "server_stats",
"MeasureValues": [
{"Name": "cpu_utilization", "Value": "75.3", "Type": "DOUBLE"},
{"Name": "memory_used_gb", "Value": "12.8", "Type": "DOUBLE"},
{"Name": "disk_read_iops", "Value": "350", "Type": "BIGINT"}
]
}
]
}
メリット:
- 書き込み 1 回で複数メトリクス
- ストレージ効率向上
- 同一タイムスタンプの一貫性
ストレージ階層化
データ書き込み(Write API)
↓
メモリストア(InMemory)
← 最近 1 時間〜数時間
← 高速クエリ向け
← 高コスト(GB/時)
↓ (ユーザー定義ポリシー自動適用)
マグネティックストア(S3)
← 数日〜年単位
← 中コスト(GB/月)
← シーケンシャルアクセス最適化
↓ (ライフサイクルポリシー)
S3 深いアーカイブ / Glacier
← 年単位の長期保存
← 最低コスト(GB/年)
| ストア | 用途 | コスト | レイテンシ |
|---|---|---|---|
| メモリストア | 直近データの超高速クエリ | 高 | < 1ms |
| マグネティックストア | 中期・分析クエリ | 中 | 数秒 |
| S3 アーカイブ | 長期コンプライアンス保存 | 低 | 分〜時 |
クエリ言語と時系列関数
基本クエリ
-- 直近 1 時間の平均 CPU 使用率(5 分ごと)
SELECT instanceId,
BIN(time, 5m) AS window,
AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(1h) AND now()
AND region = 'us-east-1'
GROUP BY instanceId, BIN(time, 5m)
ORDER BY window DESC;
-- 過去 7 日の日単位集計
SELECT DATE_TRUNC('day', time) AS day,
region,
MAX(CAST(cpu_utilization AS DOUBLE)) AS max_cpu,
AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu,
MIN(CAST(cpu_utilization AS DOUBLE)) AS min_cpu
FROM "my_app"."server_metrics"
WHERE time > ago(7d)
GROUP BY DATE_TRUNC('day', time), region
ORDER BY day DESC;
時系列専用関数
| 関数 | 機能 |
|---|---|
BIN(time, interval) |
時間をバケット化(5 分ごと等) |
FILL(value) |
NULL 値を指定値で埋める |
INTERPOLATE_LINEAR() |
線形補間で欠損値補完 |
INTERPOLATE_LOCF() |
直前値で補完(Last Observation Carried Forward) |
ago(duration) |
現在から過去の時間を計算 |
CREATE_TIME_SERIES() |
時系列オブジェクト作成 |
SMOOTH() |
スムージング(移動平均等) |
APPROX_PERCENTILE_CONT() |
近似パーセンタイル |
補間・平滑化例
-- 線形補間で欠損値を埋める
SELECT instanceId,
time,
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(time, cpu_utilization),
SEQUENCE(min(time) OVER (), max(time) OVER (), 1m)
) AS interpolated_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(24h) AND now()
GROUP BY instanceId;
-- 移動平均(5 分ウィンドウ)
SELECT instanceId,
time,
AVG(CAST(cpu_utilization AS DOUBLE)) OVER (
PARTITION BY instanceId
ORDER BY time
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) AS moving_avg_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(12h) AND now()
ORDER BY time DESC;
Scheduled Queries
定期実行クエリで事前集計・ダッシュボード用プリアグリゲーション:
-- 定義:毎 5 分ごとに 5 分単位の平均値を別テーブルに書き込み
CREATE SCHEDULED_QUERY 'compute_5min_agg' AS
SELECT
BIN(time, 5m) AS binned_time,
region,
instanceId,
AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu,
MAX(CAST(cpu_utilization AS DOUBLE)) AS max_cpu,
COUNT(*) AS sample_count
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(1h) AND now()
GROUP BY BIN(time, 5m), region, instanceId
EXEC INTO "my_app"."server_metrics_5min_agg"
SCHEDULE 'rate(5 minutes)';
メリット:
- ダッシュボード表示の高速化(事前計算済み)
- クエリコスト削減(大規模テーブルをスキャンしない)
- 自動アラート・通知(SNS 統合)
マルチメジャーレコード
シングルメジャー(非効率)
# 1 回の書き込みで 1 メトリクスのみ
for metric in ['cpu', 'memory', 'disk']:
timestream.write_records(
Records=[{
'Time': str(int(time.time() * 1000)),
'TimeUnit': 'MILLISECONDS',
'Dimensions': [{'Name': 'instance_id', 'Value': 'i-123'}],
'MeasureName': metric,
'MeasureValue': '75.3',
'MeasureValueType': 'DOUBLE'
}]
) # 3 回の API コール
マルチメジャー(推奨)
# 1 回の書き込みで複数メトリクス
timestream.write_records(
Records=[{
'Time': str(int(time.time() * 1000)),
'TimeUnit': 'MILLISECONDS',
'Dimensions': [{'Name': 'instance_id', 'Value': 'i-123'}],
'MeasureName': 'server_metrics',
'MeasureValues': [
{'Name': 'cpu_utilization', 'Value': '75.3', 'Type': 'DOUBLE'},
{'Name': 'memory_used_gb', 'Value': '12.8', 'Type': 'DOUBLE'},
{'Name': 'disk_read_iops', 'Value': '350', 'Type': 'BIGINT'}
]
}]
) # 1 回の API コール、コスト削減
主要ユースケース
-
IoT センサーモニタリング
- 工場の 10,000 台のセンサーから毎秒 100 万データポイント取得
- Grafana で異常検知ダッシュボード表示
- Scheduled Query で 1 時間ごとの集計・SNS 通知
-
アプリケーション メトリクス
- EC2・Lambda のレイテンシー・エラー率・スループット
- 13 ヶ月保持してキャパシティプランニング
- QuickSight で BI ダッシュボード
-
金融市場データ
- 株価・為替レート・先物の時系列保存
- 1 分・5 分・1 時間の OHLC(始値・高値・安値・終値)を自動計算
- Scheduled Query で価格アラート
-
スマートメーター
- 電力会社が数百万世帯の電力消費データを 7 年間保持
- 機械学習で需要予測・コスト最適化
- SageMaker と連携
-
ネットワーク・セキュリティ
- VPC Flow Logs・WAF ログ・GuardDuty アラート
- 時系列で侵入検知・トレンド分析
-
医療・IoT デバイス
- ウェアラブルから心拍・酸素飽和度を毎秒取得
- HIPAA 準拠で長期保存・研究利用
-
SaaS メトリクス
- API レスポンスタイム・アクティブユーザー数・課金メトリクス
- オペレーション・ダッシュボード
-
気象・環境モニタリング
- 気象ステーション・大気品質・水質データ
- 数十年の歴史データで気候変動分析
-
ゲーム分析
- プレイヤーの時系列アクティビティ
- リアルタイムランキング・ミッション進捗
-
製造業 - 予防保全
- 機械の振動・温度・圧力データ
- 異常検知で故障予測・ダウンタイム削減
設定・操作の具体例
AWS Management Console での作成
-
Timestream コンソール → Create Database
Database name: my_iot_app KMS Key: (AWS managed or Customer managed) -
Create Table
Table name: sensor_data Memory store retention: 1 hour Magnetic store retention: 365 days (1 year) Magnetic store S3: s3://my-bucket/timestream/ -
Table Schema
Dimensions: region, instance_id, sensor_type Time attribute: time (TIMESTAMP) Retention: Memory 1h, Magnetic 365d
CLI 操作
データベース・テーブル作成
# Database 作成
aws timestream-write create-database \
--database-name my_iot_app \
--region us-east-1
# Table 作成
aws timestream-write create-table \
--database-name my_iot_app \
--table-name sensor_data \
--retention-properties \
MemoryStoreRetentionPeriodInHours=24,\
MagneticStoreRetentionPeriodInDays=365
データ書き込み
# JSON ファイルから書き込み
aws timestream-write write-records \
--database-name my_iot_app \
--table-name sensor_data \
--records '[
{
"Time": "1667303200000",
"TimeUnit": "MILLISECONDS",
"Dimensions": [
{"Name": "region", "Value": "us-east-1"},
{"Name": "instance_id", "Value": "i-abc123"}
],
"MeasureName": "cpu_utilization",
"MeasureValue": "75.3",
"MeasureValueType": "DOUBLE"
}
]'
クエリ実行
# クエリ実行
aws timestream-query query \
--query-string "SELECT * FROM \"my_iot_app\".\"sensor_data\" WHERE time > ago(1h) LIMIT 10"
# 結果取得
aws timestream-query get-query-execution-status \
--query-id <query-id>
Scheduled Query 作成
aws timestream-write create-scheduled-query \
--name daily-aggregation \
--schedule-expression "rate(1 day)" \
--query-string "SELECT \
DATE_TRUNC('day', time) as day, \
region, \
AVG(CAST(cpu_utilization AS DOUBLE)) as avg_cpu \
FROM \"my_iot_app\".\"sensor_data\" \
WHERE time > ago(1d) \
GROUP BY DATE_TRUNC('day', time), region" \
--scheduled-query-execution-role-arn arn:aws:iam::123456789012:role/TimestreamRole \
--target-configuration '{"TimestreamConfiguration": {"DatabaseName": "my_iot_app", "TableName": "sensor_data_daily"}}'
SDK 実装例
Python (Boto3)
import boto3
import json
from datetime import datetime
import time
client = boto3.client('timestream-write', region_name='us-east-1')
query_client = boto3.client('timestream-query', region_name='us-east-1')
# データ書き込み(マルチメジャー)
def write_sensor_data(device_id, temperature, humidity, pressure):
current_time = str(int(time.time() * 1000))
records = [{
'Time': current_time,
'TimeUnit': 'MILLISECONDS',
'Dimensions': [
{'Name': 'region', 'Value': 'us-east-1'},
{'Name': 'device_id', 'Value': device_id},
{'Name': 'sensor_type', 'Value': 'environmental'}
],
'MeasureName': 'environmental_metrics',
'MeasureValues': [
{'Name': 'temperature_celsius', 'Value': str(temperature), 'Type': 'DOUBLE'},
{'Name': 'humidity_percent', 'Value': str(humidity), 'Type': 'DOUBLE'},
{'Name': 'pressure_mbar', 'Value': str(pressure), 'Type': 'DOUBLE'}
]
}]
response = client.write_records(
DatabaseName='my_iot_app',
TableName='sensor_data',
Records=records
)
print(f"Wrote {response['RecordsIngested']['Total']} records")
return response
# クエリ実行
def query_sensor_stats(hours=1):
query_string = f"""
SELECT
device_id,
BIN(time, 5m) as window,
AVG(CAST(temperature_celsius AS DOUBLE)) as avg_temp,
MAX(CAST(temperature_celsius AS DOUBLE)) as max_temp,
MIN(CAST(temperature_celsius AS DOUBLE)) as min_temp,
COUNT(*) as sample_count
FROM "my_iot_app"."sensor_data"
WHERE time > ago({hours}h)
GROUP BY device_id, BIN(time, 5m)
ORDER BY window DESC
"""
response = query_client.query(QueryString=query_string)
columns = [col['Name'] for col in response['ColumnInfo']]
print(f"Columns: {columns}")
for row in response['Rows']:
print([cell.get('ScalarValue') for cell in row['Data']])
return response
# 使用例
if __name__ == '__main__':
# データ書き込み
write_sensor_data('sensor-001', temperature=23.5, humidity=45.2, pressure=1013.25)
write_sensor_data('sensor-002', temperature=22.8, humidity=48.1, pressure=1013.30)
# クエリ実行
query_sensor_stats(hours=1)
Java
import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient;
import software.amazon.awssdk.services.timestreamwrite.model.*;
import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient;
import software.amazon.awssdk.services.timestreamquery.model.*;
public class TimestreamExample {
public static void writeRecords() {
TimestreamWriteClient client = TimestreamWriteClient.builder().build();
long time = System.currentTimeMillis();
String timeStr = Long.toString(time);
Record record = Record.builder()
.time(timeStr)
.timeUnit(TimeUnit.MILLISECONDS)
.dimensions(
Dimension.builder().name("region").value("us-east-1").build(),
Dimension.builder().name("instance_id").value("i-abc123").build()
)
.measureName("server_metrics")
.measureValues(
MeasureValue.builder().name("cpu_utilization").value("75.3").type(MeasureValueType.DOUBLE).build(),
MeasureValue.builder().name("memory_used_gb").value("12.8").type(MeasureValueType.DOUBLE).build()
)
.build();
WriteRecordsRequest writeRequest = WriteRecordsRequest.builder()
.databaseName("my_iot_app")
.tableName("sensor_data")
.records(record)
.build();
WriteRecordsResponse response = client.writeRecords(writeRequest);
System.out.println("Records written: " + response.recordsIngested().total());
client.close();
}
public static void queryRecords() {
TimestreamQueryClient queryClient = TimestreamQueryClient.builder().build();
String queryString = "SELECT * FROM \"my_iot_app\".\"sensor_data\" WHERE time > ago(1h) LIMIT 10";
QueryRequest queryRequest = QueryRequest.builder()
.queryString(queryString)
.build();
QueryResponse response = queryClient.query(queryRequest);
response.rows().forEach(row -> {
row.data().forEach(datum -> System.out.print(datum.scalarValue() + " "));
System.out.println();
});
queryClient.close();
}
}
Node.js
const AWS = require('aws-sdk');
const writeClient = new AWS.TimestreamWrite({ region: 'us-east-1' });
const queryClient = new AWS.TimestreamQuery({ region: 'us-east-1' });
// データ書き込み
async function writeSensorData(deviceId, temperature, humidity) {
const time = Date.now().toString();
const params = {
DatabaseName: 'my_iot_app',
TableName: 'sensor_data',
Records: [{
Time: time,
TimeUnit: 'MILLISECONDS',
Dimensions: [
{ Name: 'region', Value: 'us-east-1' },
{ Name: 'device_id', Value: deviceId }
],
MeasureName: 'environment_metrics',
MeasureValues: [
{ Name: 'temperature', Value: temperature.toString(), Type: 'DOUBLE' },
{ Name: 'humidity', Value: humidity.toString(), Type: 'DOUBLE' }
]
}]
};
try {
const response = await writeClient.writeRecords(params).promise();
console.log('Records written:', response.RecordsIngested.Total);
} catch (error) {
console.error('Error writing records:', error);
}
}
// クエリ実行
async function querySensorData() {
const params = {
QueryString: `
SELECT device_id,
BIN(time, 5m) as window,
AVG(CAST(temperature AS DOUBLE)) as avg_temp
FROM "my_iot_app"."sensor_data"
WHERE time > ago(1h)
GROUP BY device_id, BIN(time, 5m)
`
};
try {
const response = await queryClient.query(params).promise();
console.log('Query result:', response.Rows);
} catch (error) {
console.error('Error querying:', error);
}
}
// 実行
(async () => {
await writeSensorData('sensor-001', 23.5, 45.2);
await querySensorData();
})();
IaC (CloudFormation/Terraform)
CloudFormation
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Timestream Database and Table'
Resources:
TimestreamDatabase:
Type: AWS::Timestream::Database
Properties:
DatabaseName: my_iot_app
Tags:
- Key: Environment
Value: Production
TimestreamTable:
Type: AWS::Timestream::Table
Properties:
DatabaseName: !Ref TimestreamDatabase
TableName: sensor_data
RetentionProperties:
MemoryStoreRetentionPeriodInHours: 24
MagneticStoreRetentionPeriodInDays: 365
MagneticStoreWriteProperties:
EnableMagneticStoreWrites: true
Tags:
- Key: Environment
Value: Production
TimestreamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: TimestreamAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- timestream:WriteRecords
- timestream:SelectValues
Resource: !Sub 'arn:aws:timestream:${AWS::Region}:${AWS::AccountId}:database/${TimestreamDatabase}/table/${TimestreamTable}'
Outputs:
DatabaseName:
Value: !Ref TimestreamDatabase
TableName:
Value: !Ref TimestreamTable
RoleArn:
Value: !GetAtt TimestreamRole.Arn
Terraform
provider "aws" {
region = "us-east-1"
}
resource "aws_timestreamwrite_database" "my_iot_app" {
database_name = "my_iot_app"
tags = {
Environment = "Production"
}
}
resource "aws_timestreamwrite_table" "sensor_data" {
database_name = aws_timestreamwrite_database.my_iot_app.database_name
table_name = "sensor_data"
retention_properties {
memory_store_retention_period_in_hours = 24
magnetic_store_retention_period_in_days = 365
}
magnetic_store_write_properties {
enable_magnetic_store_writes = true
}
tags = {
Environment = "Production"
}
}
resource "aws_iam_role" "timestream_role" {
name = "timestream-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}]
})
}
resource "aws_iam_role_policy" "timestream_policy" {
name = "timestream-policy"
role = aws_iam_role.timestream_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Action = [
"timestream:WriteRecords",
"timestream:SelectValues"
]
Resource = "arn:aws:timestream:us-east-1:*:database/${aws_timestreamwrite_database.my_iot_app.database_name}/table/${aws_timestreamwrite_table.sensor_data.table_name}"
}]
})
}
output "database_name" {
value = aws_timestreamwrite_database.my_iot_app.database_name
}
output "table_name" {
value = aws_timestreamwrite_table.sensor_data.table_name
}
類似サービス比較表
| 比較軸 | Timestream | InfluxDB(OSS) | TimescaleDB | Prometheus | ClickHouse |
|---|---|---|---|---|---|
| 型 | マネージド時系列 DB | OSS 時系列 DB | PostgreSQL 拡張 | メトリクス収集 | OLAP 列指向 |
| インフラ管理 | AWS マネージド | オンプレ / 自己管理 | オンプレ / 自己管理 | オンプレ / 自己管理 | オンプレ / 自己管理 |
| スケール | 毎秒百万レコード | 毎秒万レコード | 毎秒十万レコード | 数千〜万レコード | 毎秒数百万イベント |
| クエリ言語 | SQL 互換 | InfluxQL / Flux | SQL (PostgreSQL) | PromQL | SQL |
| ストレージ階層化 | 自動(メモリ→磁気) | 手動 | 手動 | メモリのみ | S3 / HDFS 統合 |
| 学習曲線 | 低(SQL標準) | 中(InfluxQL習得) | 低(SQL拡張) | 中(PromQL) | 低(SQL標準) |
| 関数 | 時系列専用(補間等) | InfluxQL 固有 | 時系列 / ウィンドウ | 集計関数 | SQL 標準 |
| 推奨用途 | IoT・クラウドネイティブ | IoT・オンプレ | クラウド関連データ | Prometheus エコシステム | 大規模分析 |
| コスト | 従量制(書き込み・クエリ) | 初期 + 運用 | 初期 + 運用 | 初期 + 運用 | 初期 + 運用 |
| HA/DR | 99.99% SLA | 構築必要 | 構築必要 | 構築必要 | 構築必要 |
| サーバーレス | ○ | × | × | × | × |
| Grafana 統合 | ネイティブ | ネイティブ | 可能 | ネイティブ | 可能 |
ベストプラクティス
✅ 推奨パターン
-
マルチメジャーレコード利用
- 1 回の書き込みで複数メトリクス
- 書き込みコスト削減・ストレージ効率向上
-
適切なディメンション設計
✅ region, instance_id, service, environment ❌ timestamp, user_id(高カーディナリティ)カーディナリティが高すぎるとメモリストア効率低下
-
Scheduled Query でプリアグリゲーション
- ダッシュボード用の事前集計
- クエリスキャン量削減 → コスト削減
-
VPC エンドポイント利用
- プライベート接続で安全性向上
- CloudTrail ログ有効化
-
パーティション戦略
-- ディメンションを活用したパーティショニング WHERE region = 'us-east-1' AND time > ago(7d) -
メモリストア / マグネティックストア の適切な分離
- ホット(直近 1 時間)→ メモリ
- ウォーム(最近 1 週間)→ マグネティック
- コールド(過去 1 年以上)→ S3
❌ アンチパターン
-
シングルメジャーレコード の乱用
- 毎回 API コール
- 書き込みコスト増加
-
高カーディナリティディメンション
❌ user_id, request_id(数百万種類) -
スケジュール化されたアグリゲーションなし
- ダッシュボードが毎回フルスキャン
- コスト増・レイテンシ増
-
IAM ポリシーが過剰に広い
❌ "Action": "timestream:*" ✅ "Action": ["timestream:WriteRecords", "timestream:SelectValues"] -
レテンション期間の不適切な設定
- マグネティック無しで容量超過
- S3 にアーカイブしない
トラブルシューティング
| 症状 | 原因 | 対策 |
|---|---|---|
| ValidationException | スキーマ・データ型不一致 | MeasureValue が DOUBLE なのに STRING 送信 → 型確認 |
| ResourceNotFoundException | テーブル・DB が存在しない | arn:aws:timestream: 確認、リージョン確認 |
| ThrottlingException | 書き込み / クエリレート超過 | Scheduled Query で集計、バッチ処理活用 |
| AccessDeniedException | IAM ロール権限不足 | Policy に WriteRecords / SelectValues 追加 |
| Query timeout | クエリが大きすぎる / スキャンが多い | WHERE で時間範囲・ディメンション フィルター追加 |
| Memory store full | 書き込み量が多く、ポリシー遅延 | マグネティックストア移行時間確認 |
| レスポンスが遅い | メモリストア外のデータをスキャン | メモリストア保持期間を延長 |
セキュリティ・コンプライアンス
| 軸 | 内容 |
|---|---|
| IAM | リソース単位のきめ細かい権限制御 |
| KMS | 保存時暗号化(AWS Managed Key / CMK 選択可) |
| TLS / HTTPS | 転送中の暗号化(デフォルト) |
| VPC Endpoint | PrivateLink 対応、インターネット露出なし |
| CloudTrail | API 操作の監査ログ |
| Retention | コンプライアンス保持期間設定・自動削除可能 |
| 監査 | CloudWatch Logs に詳細ログ出力 |
パフォーマンスチューニング
書き込みの最適化
# バッチ書き込み(複数レコード)
def batch_write_records(records, batch_size=100):
for i in range(0, len(records), batch_size):
batch = records[i:i+batch_size]
client.write_records(
DatabaseName='my_app',
TableName='sensor_data',
Records=batch
)
クエリの最適化
-- ❌ 非効率:全テーブルスキャン
SELECT * FROM "my_app"."sensor_data"
-- ✅ 効率的:時間範囲フィルター
SELECT * FROM "my_app"."sensor_data"
WHERE time > ago(1h)
-- ✅ さらに効率的:ディメンション + 時間フィルター
SELECT * FROM "my_app"."sensor_data"
WHERE region = 'us-east-1'
AND time > ago(1h)
-- ✅ 最高効率:Scheduled Query で事前集計
SELECT device_id, BIN(time, 5m), AVG(temperature)
FROM "my_app"."sensor_data_5min_agg" -- 既に集計済み
WHERE time > ago(1h)
コスト最適化
1. マルチメジャーレコード利用
- シングル × 3 メトリクス → マルチ × 1 レコード
- 書き込みコスト 1/3
2. Scheduled Query でプリアグリゲーション
- ダッシュボード毎回フルスキャン → 事前集計
- クエリコスト 50-90% 削減
3. 適切なレテンション設定
- メモリストア: 24 時間(ホットデータ)
- マグネティック: 365 日(ウォームデータ)
- → 古いデータは S3 ライフサイクルで移行
4. フィルターの活用
WHERE region = 'us-east-1' AND time > ago(7d)
不要なデータはスキャン前に除外
2025-2026 最新動向
Timestream for InfluxDB の進化
- InfluxDB v3 互試対応(2026 年上半期予定)
- パフォーマンス向上・マルチテナント対応
Vector Search サポート
- 埋め込みベクトルの直接保存・検索
- 時系列 + ベクトル検索の統合
Zero-ETL 統合
- Redshift との自動同期
- Glue なしでのデータレプリケーション
AI/ML 統合の深化
- SageMaker Forecast との直接統合
- Bedrock で異常検知・予測
マルチリージョン対応
- クロスリージョン レプリケーション
- グローバルアプリケーション対応
学習リソース・参考文献
公式リソース
- Amazon Timestream Developer Guide
- Amazon Timestream for InfluxDB
- Timestream API Reference
- Timestream Pricing
- AWS Timestream Workshop
ベンダー・OSS リソース
- InfluxDB Documentation
- TimescaleDB Documentation
- Prometheus Documentation
- ClickHouse Documentation
- Grafana Timestream Plugin
コミュニティ・ブログ
- AWS Database Blog
- Time Series Data Management
- IoT on AWS Blog
実装例・チェックリスト
チェックリスト
- [ ] データレート(毎秒レコード数)を測定
- [ ] ディメンション・カーディナリティを設計
- [ ] マルチメジャーレコード利用か確認
- [ ] メモリストア / マグネティックストア 保持期間設定
- [ ] Scheduled Query でプリアグリゲーション計画
- [ ] VPC エンドポイント設定
- [ ] IAM ロール・最小権限確認
- [ ] CloudTrail 監査ログ有効化
- [ ] Grafana / QuickSight 接続確認
- [ ] コスト見積もり・予算アラート設定
実装チェック
# Timestream 接続確認
import boto3
client = boto3.client('timestream-write')
# DB リスト確認
response = client.list_databases()
print("Databases:", [db['DatabaseName'] for db in response['Databases']])
# テーブル確認
response = client.list_tables(DatabaseName='my_app')
print("Tables:", [t['TableName'] for t in response['Tables']])
# 書き込みテスト
import time
response = client.write_records(
DatabaseName='my_app',
TableName='sensor_data',
Records=[{
'Time': str(int(time.time() * 1000)),
'TimeUnit': 'MILLISECONDS',
'Dimensions': [{'Name': 'test', 'Value': 'true'}],
'MeasureName': 'test_metric',
'MeasureValue': '1.0',
'MeasureValueType': 'DOUBLE'
}]
)
print("Write success:", response['RecordsIngested']['Total'] > 0)
まとめ
Amazon Timestream は 「時系列データ専用の高速・低コスト・マネージド DB」 です。IoT センサーデータ・サーバーメトリクス・アプリテレメトリなど、タイムスタンプが主軸のデータに特化しており、以下の特徴を提供します:
- 自動ストレージ階層化 でコスト最適化
- SQL 互換クエリ で学習コスト低減
- 時系列専用関数 で分析を高速化
- Scheduled Query でダッシュボード最適化
- Grafana 統合 でリアルタイムダッシュボード
- 99.99% SLA でエンタープライズ品質
RDS / DynamoDB では対応困難な 毎秒百万レコード規模 のデータを、低コスト・低運用負荷で実現します。Timestream for LiveAnalytics(AWS ネイティブ)か Timestream for InfluxDB(InfluxDB 既存ユーザー向け)から選択できます。
最終更新:2026-04-26 バージョン:v2.0