golang连接influxdb的orm操作
1. 安装influxdb-client-go包
go get github.com/influxdata/influxdb-client-go/v2
2. 连接influxdb
type InfluxDBCon struct {client influxdb2.Client // 客户端连接writeClient api.WriteAPI // 写对象queryClient api.QueryAPI // 读对象deleteAPI api.DeleteAPI // 删除org string // 组织bucket string // 桶
}// 建立连接
func NewInfluxDBCon(url, token, org, bucket string) *InfluxDBCon {client := influxdb2.NewClientWithOptions(url, token,influxdb2.DefaultOptions().SetBatchSize(20))return &InfluxDBCon{client: client,writeClient: client.WriteAPI(org, bucket),queryClient: client.QueryAPI(org),deleteAPI: client.DeleteAPI(),org: org,bucket: bucket,}
}// 关闭连接
func (c *InfluxDBCon) Close() {c.client.Close()
}func (c *InfluxDBCon) WriteClient() api.WriteAPI {return c.writeClient
}func (c *InfluxDBCon) QueryClient() api.QueryAPI {return c.queryClient
}
3. 增删读改
func (c *InfluxDBCon) WritePoint(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) {p := influxdb2.NewPoint(measurement, tags, fields, ts)c.writeClient.WritePoint(p)c.writeClient.Flush()
}func (c *InfluxDBCon) Insert(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) error {c.WritePoint(measurement, tags, fields, ts)return nil
}func (c *InfluxDBCon) DeletePoints(measurement string, start, stop time.Time, tagEq map[string]string, extraPredicate string) error {predicate := buildDeletePredicate(measurement, tagEq, extraPredicate)// 直接用名字版本,避免去查 domain 对象return c.deleteAPI.DeleteWithName(context.Background(), c.org, c.bucket, start, stop, predicate)
}// 注意:Delete 谓词只支持按 tag / _measurement(不支持 field 值)
// 参考官方说明。:contentReference[oaicite:1]{index=1}
func buildDeletePredicate(measurement string, tagEq map[string]string, extra string) string {parts := []string{fmt.Sprintf(`_measurement="%s"`, escapePredicate(measurement))}for k, v := range tagEq {parts = append(parts, fmt.Sprintf(`%s="%s"`, k, escapePredicate(v)))}if s := strings.TrimSpace(extra); s != "" {parts = append(parts, fmt.Sprintf("(%s)", s))}return strings.Join(parts, " and ")
}func escapePredicate(s string) string {s = strings.ReplaceAll(s, `\`, `\\`)s = strings.ReplaceAll(s, `"`, `\"`)return s
}func (c *InfluxDBCon) QueryOne(flux string) (map[string]interface{}, error) {res, err := c.queryClient.Query(context.Background(), flux)if err != nil {return nil, err}defer res.Close()if res.Next() {record := res.Record()data := map[string]interface{}{"time": record.Time(),"value": record.Value(),}for k, v := range record.Values() {data[k] = v}return data, nil}return nil, fmt.Errorf("no record found")
}func (c *InfluxDBCon) Update(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) error {// 用极窄时间窗 + 完整 tags 来删除旧点(避免误删)start := ts.Add(-1 * time.Second)stop := ts.Add(1 * time.Second)if err := c.DeletePoints(measurement, start, stop, tags, ""); err != nil {return err}// 再写入新点p := influxdb2.NewPoint(measurement, tags, fields, ts)c.writeClient.WritePoint(p)c.writeClient.Flush()return nil
}// ==== 统计功能 ====func (c *InfluxDBCon) Count(measurement string, start time.Duration) (int64, error) {flux := fmt.Sprintf(`from(bucket:"%s")|> range(start: -%s)|> filter(fn: (r) => r._measurement == "%s")|> count()`, c.bucket, start.String(), measurement)res, err := c.queryClient.Query(context.Background(), flux)if err != nil {return 0, err}defer res.Close()if res.Next() {if v, ok := res.Record().Value().(int64); ok {return v, nil}}return 0, nil
}
4. 单元测试
// 初始化测试客户端
func initTestClient() *InfluxDBCon {token := "PjpqAXwbXwizAFieHJekR2tZUf3djVMuAcFpS1mbSjyi80NRWG11hqlc4ZPDs0SLh7oIxEZnufWLzZueERVHkw=="url := "http://192.168.20.211:8086"org := "operate"bucket := "mybuk"return NewInfluxDBCon(url, token, org, bucket)
}func TestInfluxDBCon(t *testing.T) {client := initTestClient()defer client.Close()measurement := "system_test"tags := map[string]string{"id": "rack_1","vendor": "AWS",}fields := map[string]interface{}{"temperature": rand.Float64() * 80,"disk_free": rand.Float64() * 1000,}now := time.Now()// 1. Inserterr := client.Insert(measurement, tags, fields, now)if err != nil {t.Fatalf("insert error: %v", err)}t.Log("insert ok")time.Sleep(1 * time.Second) // 写入的数据,落盘需要时间// 2. QueryOneflux := fmt.Sprintf(`from(bucket:"%s")|> range(start: -1h)|> filter(fn: (r) => r._measurement == "%s")|> limit(n:1)`, client.bucket, measurement)record, err := client.QueryOne(flux)if err != nil {t.Fatalf("query one error: %v", err)}t.Logf("query one result: %+v", record)// 3. Updatefields["temperature"] = 66.6err = client.Update(measurement, tags, fields, now)if err != nil {t.Fatalf("update error: %v", err)}t.Log("update ok")// 4. Countcount, err := client.Count(measurement, time.Hour)if err != nil {t.Fatalf("count error: %v", err)}t.Logf("count result: %d", count)// 8. Deleteerr = client.DeletePoints(measurement,now.Add(-1*time.Minute),now.Add(1*time.Minute),map[string]string{"id": "rack_1", "vendor": "AWS"},"",)if err != nil {t.Fatalf("delete error: %v", err)}t.Log("delete ok")
}