数据写入
这篇讲 InfluxDB 的数据写入方式:Line Protocol 语法、HTTP API、各语言客户端以及批量写入的最佳实践。
Line Protocol:InfluxDB 的通用数据格式
所有数据以 Line Protocol 格式写入——一行一条记录:
measurement,tag_key=tag_value field_key=field_value timestamp规则:
measurement:必须,不转义逗号和空格tag set:可选,逗号分隔,都是字符串field set:必须,空格分隔,可以是字符串/整数/浮点/布尔timestamp:可选(不写则使用服务器时间)
# 基本示例
temperature,location=office,room=101 value=23.5 1705315200000000000
# 多个 field
cpu,host=server01 usage_user=65.2,usage_system=12.8,usage_idle=22.0
# 字符串 field(加双引号)
log,app=nginx message="connection refused"
# 布尔 field
status,service=api up=true
# 整数 field(加 i 后缀)
counter,type=request value=42iHTTP API 写入
InfluxDB 2.x 使用 REST API:
# 单条写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=s" \
-H "Authorization: Token your-api-token" \
-d 'cpu,host=server01 usage=42.5'
# 批量写入(换行分隔)
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=s" \
-H "Authorization: Token your-api-token" \
-d 'cpu,host=server01 usage=42.5
cpu,host=server02 usage=55.1
cpu,host=server03 usage=38.7'Python 客户端写入
安装:
pip install influxdb-clientfrom influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
# 连接
client = InfluxDBClient(
url="http://localhost:8086",
token="your-api-token",
org="my-org"
)
write_api = client.write_api(write_options=SYNCHRONOUS)
# 方式 1:使用 Point 对象
point = Point("cpu") \
.tag("host", "server01") \
.tag("region", "us-west") \
.field("usage", 87.5) \
.field("user", 62.3) \
.time(datetime.utcnow())
write_api.write(bucket="my-bucket", record=point)
# 方式 2:Line Protocol 字符串
record = 'cpu,host=server01,region=us-west usage=87.5,user=62.3'
write_api.write(bucket="my-bucket", record=record)
# 方式 3:字典
record = {
"measurement": "cpu",
"tags": {"host": "server01"},
"fields": {"usage": 87.5, "user": 62.3},
"time": datetime.utcnow()
}
write_api.write(bucket="my-bucket", record=record)
client.close()
```python
批量写入最重要的一条原则:攒够一批(如 5000 条或 5 秒)再 flush,不要来一条写一条。HTTP 请求的往返开销远大于数据写入本身。
## 批量写入最佳实践
```python
from influxdb_client import InfluxDBClient, Point, WriteOptions
import random
import time
# 使用异步批量写入
client = InfluxDBClient(
url="http://localhost:8086",
token="your-api-token",
org="my-org"
)
# 批量写入配置
write_api = client.write_api(write_options=WriteOptions(
batch_size=5000, # 每 5000 条 flush 一次
flush_interval=5_000, # 或每 5 秒 flush
jitter_interval=2_000, # 随机抖动避免惊群
retry_interval=5_000, # 失败重试间隔
))
# 模拟数据写入
for i in range(10000):
point = Point("sensor") \
.tag("device_id", f"sensor_{random.randint(1,100)}") \
.field("temperature", random.uniform(20.0, 35.0)) \
.field("humidity", random.uniform(40.0, 80.0))
write_api.write(bucket="my-bucket", record=point)
# 确保所有数据写入完毕
write_api.flush()
write_api.close()
client.close()Tag vs Field 的选择
这是时序数据库设计最关键的问题:
| Tag | Field | |
|---|---|---|
| 是否索引 | ✅ 索引 | ❌ 不索引 |
| 查询速度 | 快(WHERE tag = ?) | 慢(需扫描) |
| 基数要求 | 低(几十到几千) | 任意 |
| 可分组 | ✅ GROUP BY tag | ❌ |
| 典型值 | host、region、status | usage、response_time |
不要把高基数的值放 tag。例如 user_id、request_id、随机数——这会导致 series 数量爆炸,内存耗尽。tag 的基数(不同值的数量)控制在几万以内。
一句话小结
Line Protocol 格式:measurement,tags fields timestamp。写入用 HTTP API 或客户端库,批量 flush 是关键优化。Tag 放低基数的检索维度,Field 放高基数的数值——记住这条黄金法则。下一篇讲 数据查询。
最后更新于