跳至内容

数据查询

这篇讲 InfluxDB 的数据查询:2.x 的 Flux 脚本语言基础和 1.x 兼容的 InfluxQL。如果你从 MySQL 转过来,先放下 SQL 思维——时序查询有自己的范式。

Flux 基础

Flux 是 InfluxDB 2.x 的函数式查询语言,用管道(|>)连接操作:

from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> filter(fn: (r) => r._field == "usage")
  |> filter(fn: (r) => r.host == "server01")

基本查询模式

// 最近 1 小时的 CPU 数据
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")

// 指定时间范围
from(bucket: "my-bucket")
  |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-02T00:00:00Z)

// 过滤 tag 和 field
from(bucket: "my-bucket")
  |> range(start: -30m)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> filter(fn: (r) => r._field == "usage_user" or r._field == "usage_system")
  |> filter(fn: (r) => r.host == "server01" or r.host == "server02")

// 限制返回数量
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> limit(n: 10)
```text

## 聚合与下采样

```flux
// 平均值
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> mean()

// 按时间窗口聚合(每 5 分钟的平均值)
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> aggregateWindow(every: 5m, fn: mean)

// 多维度聚合:按 host 分组,每 10 分钟取最大值
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
  |> aggregateWindow(every: 10m, fn: max)
  |> group(columns: ["host"])

// 多重计算
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
  |> aggregateWindow(every: 5m, fn: (tables=<-, column="_value") =>
       tables
         |> mean()
         |> duplicate(column: "_value", as: "_value_mean")
  )

常用转换

// 排序
|> sort(columns: ["_time"], desc: true)

// 去重
|> unique(column: "host")

// 前 N 条
|> top(n: 5, columns: ["_value"])

// 计算变化率
|> derivative(unit: 1s, nonNegative: true)

// 移动平均
|> movingAverage(n: 5)

// 累计和
|> cumulativeSum()

// pivot:行转列
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
```text

## 实用查询示例

```flux
// 1. 最近 15 分钟的 CPU 负载前 5
from(bucket: "my-bucket")
  |> range(start: -15m)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
  |> group(columns: ["host", "_field"])
  |> mean()
  |> group()
  |> top(n: 5, columns: ["_value"])

// 2. 计算每个主机的 CPU 使用率变化
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
  |> derivative(unit: 1s, nonNegative: true)

// 3. 告警查询:查找 CPU > 90% 的记录
from(bucket: "my-bucket")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
  |> filter(fn: (r) => r._value > 90)

InfluxQL(1.x 兼容)

InfluxDB 2.x 仍兼容 InfluxQL(类 SQL 语法),通过 /query 端点:

-- 基本查询
SELECT usage FROM cpu WHERE host = 'server01' AND time > now() - 1h

-- 聚合
SELECT MEAN(usage) FROM cpu WHERE time > now() - 1h GROUP BY time(10m), host

-- 前 5
SELECT TOP(usage, 5) FROM cpu WHERE time > now() - 1h

-- 计算速率
SELECT DERIVATIVE(usage) FROM cpu WHERE time > now() - 1h GROUP BY host
新项目建议直接用 Flux——它是 InfluxDB 的未来方向,功能比 InfluxQL 强大(支持 join、自定义函数、多种数据源)。InfluxQL 主要用于兼容旧版本。

Flux vs SQL 对照

操作SQLFlux
指定数据源FROM tablefrom(bucket: "b")
时间范围WHERE time > ...|> range(start: -1h)
过滤WHERE ...|> filter(fn: (r) => ...)
分组GROUP BY|> group(columns: [...])
聚合SELECT AVG(x)|> mean()
排序ORDER BY|> sort()
限制LIMIT 10|> limit(n: 10)

一句话小结

Flux 是函数式查询语言,用管道(|>) 串联操作。range 定时间范围,filter 过滤条件,aggregateWindow 做时间窗口聚合。新项目用 Flux,老项目可继续用 InfluxQL。下一篇讲 保留策略与任务

最后更新于