跳至内容
保留策略与任务

保留策略与任务

这篇讲 InfluxDB 的数据生命周期管理:保留策略(Retention Policy)、自动下采样和定时任务。时序数据量增长极快——不管理会吃掉所有磁盘。

保留策略——数据自动过期

时序数据的特点是:越旧的数据价值越低。InfluxDB 用 retention period 自动清理过期数据。

在 InfluxDB 2.x 中,retention 配置在 bucket 级别:

# 创建有保留期的 bucket
influx bucket create \
  --name sensor_data \
  --org my-org \
  --retention 30d    # 30 天后自动删除

# 修改已有 bucket 的保留期
influx bucket update \
  --name sensor_data \
  --retention 90d    # 改为 90 天

# 无限保留
influx bucket update \
  --name sensor_data \
  --retention 0      # 0 = 永久保留

保留策略设计建议

数据类型建议保留期原因
原始高精度数据7-30 天空间有限,短期调试用
10 分钟聚合数据90-365 天日常监控和趋势分析
1 小时聚合数据1-3 年年度报告和容量规划
1 天聚合数据永久多年趋势对比
不要把所有数据都永久保留。以一个 100 台机器的集群为例,每秒采集一次 CPU 数据,每天产生 800 万条记录——一年就是 30 亿条。用下采样分层存储才是正解。

下采样(Downsampling)

下采样就是把高精度数据聚合成低精度数据,既保留了长期趋势又大幅节省空间。

创建下采样任务

InfluxDB 2.x 用 Task 实现定期下采样:

// 每 10 分钟执行一次:将 10 秒原始数据聚合成 10 分钟级数据
option task = {
    name: "cpu_downsample_10m",
    every: 10m,
}

from(bucket: "raw_data")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 1m, fn: mean)
    |> to(bucket: "downsampled_10m", org: "my-org")
```bash

创建 task:

```bash
influx task create \
  --org my-org \
  --name cpu_downsample \
  --file cpu_downsample.flux

分层下采样架构

原始数据(10s 精度)  →  Bucket: raw_data(保留 7 天)
    ↓ Task: 每 10 分钟
10 分钟聚合           →  Bucket: ds_10m(保留 90 天)
    ↓ Task: 每 1 小时
1 小时聚合            →  Bucket: ds_1h(保留 1 年)
    ↓ Task: 每天
1 天聚合              →  Bucket: ds_1d(永久保留)

创建分层 task 的完整脚本:

// raw → 10m downsampling
option task = { name: "downsample_cpu_10m", every: 10m }

from(bucket: "raw_data")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> filter(fn: (r) => r._field == "usage")
    |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
    |> to(bucket: "ds_10m", org: "my-org")


// 10m → 1h downsampling
option task = { name: "downsample_cpu_1h", every: 1h }

from(bucket: "ds_10m")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
    |> to(bucket: "ds_1h", org: "my-org")
```python

## 用 API 管理 Task

```python
from influxdb_client import InfluxDBClient

client = InfluxDBClient(
    url="http://localhost:8086",
    token="your-api-token",
    org="my-org"
)

tasks_api = client.tasks_api()

# 列出所有 task
tasks = tasks_api.find_tasks()
for task in tasks:
    print(f"Task: {task.name}, Status: {task.status}")

# 获取 task 的运行日志
logs = tasks_api.get_logs(task_id="task-id-here")
for log in logs:
    print(f"{log.run_id}: {log.message}")

client.close()

检查和通知

InfluxDB 2.x 内置了简单的告警机制:

// 创建检查(Check):CPU > 90% 触发通知
import "influxdata/influxdb/monitor"

option task = { name: "cpu_alert", every: 1m }

data = from(bucket: "raw_data")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
    |> aggregateWindow(every: 1m, fn: mean)

// 通知方式:HTTP endpoint / Slack / PagerDuty
monitor.check(
    data: data,
    messageFn: (r) => "CPU 使用率过高: ${string(v: r._value)}%",
    crit: (r) => r._value > 90.0,
)
```text

对于生产环境的告警,建议用 Grafana 的 Alerting 功能替代 InfluxDB 内置的 Check——Grafana 的告警规则更灵活,通知渠道更丰富(Slack/微信/钉钉/邮件)。
## 一句话小结 保留策略让旧数据自动过期——用 retention period 控制成本。下采样分层存储(原始→10分钟→1小时→1天)既保留趋势又节省空间。Task 是下采样的执行引擎,每 10 分钟或 1 小时定期运行。下一篇讲如何把 InfluxDB 数据接入 [Grafana 可视化](../05可视化与监控/)。
最后更新于