保留策略与任务
这篇讲 InfluxDB 的数据生命周期管理:保留策略(Retention Policy)、自动下采样和定时任务。时序数据量增长极快——不管理会吃掉所有磁盘。
保留策略——数据自动过期
时序数据的特点是:越旧的数据价值越低。InfluxDB 用 retention period 自动清理过期数据。
在 InfluxDB 2.x 中,retention 配置在 bucket 级别:
# 创建有保留期的 bucket
influx bucket create \
--name sensor_data \
--org my-org \
--retention 30d # 30 天后自动删除
# 修改已有 bucket 的保留期
influx bucket update \
--name sensor_data \
--retention 90d # 改为 90 天
# 无限保留
influx bucket update \
--name sensor_data \
--retention 0 # 0 = 永久保留保留策略设计建议
| 数据类型 | 建议保留期 | 原因 |
|---|---|---|
| 原始高精度数据 | 7-30 天 | 空间有限,短期调试用 |
| 10 分钟聚合数据 | 90-365 天 | 日常监控和趋势分析 |
| 1 小时聚合数据 | 1-3 年 | 年度报告和容量规划 |
| 1 天聚合数据 | 永久 | 多年趋势对比 |
不要把所有数据都永久保留。以一个 100 台机器的集群为例,每秒采集一次 CPU 数据,每天产生 800 万条记录——一年就是 30 亿条。用下采样分层存储才是正解。
下采样(Downsampling)
下采样就是把高精度数据聚合成低精度数据,既保留了长期趋势又大幅节省空间。
创建下采样任务
InfluxDB 2.x 用 Task 实现定期下采样:
// 每 10 分钟执行一次:将 10 秒原始数据聚合成 10 分钟级数据
option task = {
name: "cpu_downsample_10m",
every: 10m,
}
from(bucket: "raw_data")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 1m, fn: mean)
|> to(bucket: "downsampled_10m", org: "my-org")
```bash
创建 task:
```bash
influx task create \
--org my-org \
--name cpu_downsample \
--file cpu_downsample.flux分层下采样架构
原始数据(10s 精度) → Bucket: raw_data(保留 7 天)
↓ Task: 每 10 分钟
10 分钟聚合 → Bucket: ds_10m(保留 90 天)
↓ Task: 每 1 小时
1 小时聚合 → Bucket: ds_1h(保留 1 年)
↓ Task: 每天
1 天聚合 → Bucket: ds_1d(永久保留)创建分层 task 的完整脚本:
// raw → 10m downsampling
option task = { name: "downsample_cpu_10m", every: 10m }
from(bucket: "raw_data")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
|> to(bucket: "ds_10m", org: "my-org")
// 10m → 1h downsampling
option task = { name: "downsample_cpu_1h", every: 1h }
from(bucket: "ds_10m")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
|> to(bucket: "ds_1h", org: "my-org")
```python
## 用 API 管理 Task
```python
from influxdb_client import InfluxDBClient
client = InfluxDBClient(
url="http://localhost:8086",
token="your-api-token",
org="my-org"
)
tasks_api = client.tasks_api()
# 列出所有 task
tasks = tasks_api.find_tasks()
for task in tasks:
print(f"Task: {task.name}, Status: {task.status}")
# 获取 task 的运行日志
logs = tasks_api.get_logs(task_id="task-id-here")
for log in logs:
print(f"{log.run_id}: {log.message}")
client.close()检查和通知
InfluxDB 2.x 内置了简单的告警机制:
// 创建检查(Check):CPU > 90% 触发通知
import "influxdata/influxdb/monitor"
option task = { name: "cpu_alert", every: 1m }
data = from(bucket: "raw_data")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
|> aggregateWindow(every: 1m, fn: mean)
// 通知方式:HTTP endpoint / Slack / PagerDuty
monitor.check(
data: data,
messageFn: (r) => "CPU 使用率过高: ${string(v: r._value)}%",
crit: (r) => r._value > 90.0,
)
```text
对于生产环境的告警,建议用 Grafana 的 Alerting 功能替代 InfluxDB 内置的 Check——Grafana 的告警规则更灵活,通知渠道更丰富(Slack/微信/钉钉/邮件)。
## 一句话小结
保留策略让旧数据自动过期——用 retention period 控制成本。下采样分层存储(原始→10分钟→1小时→1天)既保留趋势又节省空间。Task 是下采样的执行引擎,每 10 分钟或 1 小时定期运行。下一篇讲如何把 InfluxDB 数据接入 [Grafana 可视化](../05可视化与监控/)。最后更新于