跳至内容

数据写入

这篇讲 InfluxDB 的数据写入方式:Line Protocol 语法、HTTP API、各语言客户端以及批量写入的最佳实践。

Line Protocol:InfluxDB 的通用数据格式

所有数据以 Line Protocol 格式写入——一行一条记录:

measurement,tag_key=tag_value field_key=field_value timestamp

规则:

  • measurement:必须,不转义逗号和空格
  • tag set:可选,逗号分隔,都是字符串
  • field set:必须,空格分隔,可以是字符串/整数/浮点/布尔
  • timestamp:可选(不写则使用服务器时间)
# 基本示例
temperature,location=office,room=101 value=23.5 1705315200000000000

# 多个 field
cpu,host=server01 usage_user=65.2,usage_system=12.8,usage_idle=22.0

# 字符串 field(加双引号)
log,app=nginx message="connection refused"

# 布尔 field
status,service=api up=true

# 整数 field(加 i 后缀)
counter,type=request value=42i

HTTP API 写入

InfluxDB 2.x 使用 REST API:

# 单条写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=s" \
  -H "Authorization: Token your-api-token" \
  -d 'cpu,host=server01 usage=42.5'

# 批量写入(换行分隔)
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=s" \
  -H "Authorization: Token your-api-token" \
  -d 'cpu,host=server01 usage=42.5
cpu,host=server02 usage=55.1
cpu,host=server03 usage=38.7'

Python 客户端写入

安装:

pip install influxdb-client
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime

# 连接
client = InfluxDBClient(
    url="http://localhost:8086",
    token="your-api-token",
    org="my-org"
)

write_api = client.write_api(write_options=SYNCHRONOUS)

# 方式 1:使用 Point 对象
point = Point("cpu") \
    .tag("host", "server01") \
    .tag("region", "us-west") \
    .field("usage", 87.5) \
    .field("user", 62.3) \
    .time(datetime.utcnow())

write_api.write(bucket="my-bucket", record=point)

# 方式 2:Line Protocol 字符串
record = 'cpu,host=server01,region=us-west usage=87.5,user=62.3'
write_api.write(bucket="my-bucket", record=record)

# 方式 3:字典
record = {
    "measurement": "cpu",
    "tags": {"host": "server01"},
    "fields": {"usage": 87.5, "user": 62.3},
    "time": datetime.utcnow()
}
write_api.write(bucket="my-bucket", record=record)

client.close()
```python

批量写入最重要的一条原则:攒够一批(如 5000 条或 5 秒)再 flush,不要来一条写一条。HTTP 请求的往返开销远大于数据写入本身。
## 批量写入最佳实践 ```python from influxdb_client import InfluxDBClient, Point, WriteOptions import random import time # 使用异步批量写入 client = InfluxDBClient( url="http://localhost:8086", token="your-api-token", org="my-org" ) # 批量写入配置 write_api = client.write_api(write_options=WriteOptions( batch_size=5000, # 每 5000 条 flush 一次 flush_interval=5_000, # 或每 5 秒 flush jitter_interval=2_000, # 随机抖动避免惊群 retry_interval=5_000, # 失败重试间隔 )) # 模拟数据写入 for i in range(10000): point = Point("sensor") \ .tag("device_id", f"sensor_{random.randint(1,100)}") \ .field("temperature", random.uniform(20.0, 35.0)) \ .field("humidity", random.uniform(40.0, 80.0)) write_api.write(bucket="my-bucket", record=point) # 确保所有数据写入完毕 write_api.flush() write_api.close() client.close()

Tag vs Field 的选择

这是时序数据库设计最关键的问题:

TagField
是否索引✅ 索引❌ 不索引
查询速度快(WHERE tag = ?慢(需扫描)
基数要求(几十到几千)任意
可分组GROUP BY tag
典型值host、region、statususage、response_time
不要把高基数的值放 tag。例如 user_id、request_id、随机数——这会导致 series 数量爆炸,内存耗尽。tag 的基数(不同值的数量)控制在几万以内。

一句话小结

Line Protocol 格式:measurement,tags fields timestamp。写入用 HTTP API 或客户端库,批量 flush 是关键优化。Tag 放低基数的检索维度,Field 放高基数的数值——记住这条黄金法则。下一篇讲 数据查询

最后更新于