当前位置: 首页 > backend >正文

golang连接influxdb的orm操作

1. 安装influxdb-client-go包

go get github.com/influxdata/influxdb-client-go/v2

2. 连接influxdb


type InfluxDBCon struct {client      influxdb2.Client // 客户端连接writeClient api.WriteAPI // 写对象queryClient api.QueryAPI // 读对象deleteAPI   api.DeleteAPI // 删除org         string // 组织bucket      string // 桶
}// 建立连接
func NewInfluxDBCon(url, token, org, bucket string) *InfluxDBCon {client := influxdb2.NewClientWithOptions(url, token,influxdb2.DefaultOptions().SetBatchSize(20))return &InfluxDBCon{client:      client,writeClient: client.WriteAPI(org, bucket),queryClient: client.QueryAPI(org),deleteAPI:   client.DeleteAPI(),org:         org,bucket:      bucket,}
}// 关闭连接
func (c *InfluxDBCon) Close() {c.client.Close()
}func (c *InfluxDBCon) WriteClient() api.WriteAPI {return c.writeClient
}func (c *InfluxDBCon) QueryClient() api.QueryAPI {return c.queryClient
}

3. 增删读改


func (c *InfluxDBCon) WritePoint(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) {p := influxdb2.NewPoint(measurement, tags, fields, ts)c.writeClient.WritePoint(p)c.writeClient.Flush()
}func (c *InfluxDBCon) Insert(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) error {c.WritePoint(measurement, tags, fields, ts)return nil
}func (c *InfluxDBCon) DeletePoints(measurement string, start, stop time.Time, tagEq map[string]string, extraPredicate string) error {predicate := buildDeletePredicate(measurement, tagEq, extraPredicate)// 直接用名字版本,避免去查 domain 对象return c.deleteAPI.DeleteWithName(context.Background(), c.org, c.bucket, start, stop, predicate)
}// 注意:Delete 谓词只支持按 tag / _measurement(不支持 field 值)
// 参考官方说明。:contentReference[oaicite:1]{index=1}
func buildDeletePredicate(measurement string, tagEq map[string]string, extra string) string {parts := []string{fmt.Sprintf(`_measurement="%s"`, escapePredicate(measurement))}for k, v := range tagEq {parts = append(parts, fmt.Sprintf(`%s="%s"`, k, escapePredicate(v)))}if s := strings.TrimSpace(extra); s != "" {parts = append(parts, fmt.Sprintf("(%s)", s))}return strings.Join(parts, " and ")
}func escapePredicate(s string) string {s = strings.ReplaceAll(s, `\`, `\\`)s = strings.ReplaceAll(s, `"`, `\"`)return s
}func (c *InfluxDBCon) QueryOne(flux string) (map[string]interface{}, error) {res, err := c.queryClient.Query(context.Background(), flux)if err != nil {return nil, err}defer res.Close()if res.Next() {record := res.Record()data := map[string]interface{}{"time":  record.Time(),"value": record.Value(),}for k, v := range record.Values() {data[k] = v}return data, nil}return nil, fmt.Errorf("no record found")
}func (c *InfluxDBCon) Update(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) error {// 用极窄时间窗 + 完整 tags 来删除旧点(避免误删)start := ts.Add(-1 * time.Second)stop := ts.Add(1 * time.Second)if err := c.DeletePoints(measurement, start, stop, tags, ""); err != nil {return err}// 再写入新点p := influxdb2.NewPoint(measurement, tags, fields, ts)c.writeClient.WritePoint(p)c.writeClient.Flush()return nil
}// ==== 统计功能 ====func (c *InfluxDBCon) Count(measurement string, start time.Duration) (int64, error) {flux := fmt.Sprintf(`from(bucket:"%s")|> range(start: -%s)|> filter(fn: (r) => r._measurement == "%s")|> count()`, c.bucket, start.String(), measurement)res, err := c.queryClient.Query(context.Background(), flux)if err != nil {return 0, err}defer res.Close()if res.Next() {if v, ok := res.Record().Value().(int64); ok {return v, nil}}return 0, nil
}

4. 单元测试


// 初始化测试客户端
func initTestClient() *InfluxDBCon {token := "PjpqAXwbXwizAFieHJekR2tZUf3djVMuAcFpS1mbSjyi80NRWG11hqlc4ZPDs0SLh7oIxEZnufWLzZueERVHkw=="url := "http://192.168.20.211:8086"org := "operate"bucket := "mybuk"return NewInfluxDBCon(url, token, org, bucket)
}func TestInfluxDBCon(t *testing.T) {client := initTestClient()defer client.Close()measurement := "system_test"tags := map[string]string{"id":     "rack_1","vendor": "AWS",}fields := map[string]interface{}{"temperature": rand.Float64() * 80,"disk_free":   rand.Float64() * 1000,}now := time.Now()// 1. Inserterr := client.Insert(measurement, tags, fields, now)if err != nil {t.Fatalf("insert error: %v", err)}t.Log("insert ok")time.Sleep(1 * time.Second) // 写入的数据,落盘需要时间// 2. QueryOneflux := fmt.Sprintf(`from(bucket:"%s")|> range(start: -1h)|> filter(fn: (r) => r._measurement == "%s")|> limit(n:1)`, client.bucket, measurement)record, err := client.QueryOne(flux)if err != nil {t.Fatalf("query one error: %v", err)}t.Logf("query one result: %+v", record)// 3. Updatefields["temperature"] = 66.6err = client.Update(measurement, tags, fields, now)if err != nil {t.Fatalf("update error: %v", err)}t.Log("update ok")// 4. Countcount, err := client.Count(measurement, time.Hour)if err != nil {t.Fatalf("count error: %v", err)}t.Logf("count result: %d", count)// 8. Deleteerr = client.DeletePoints(measurement,now.Add(-1*time.Minute),now.Add(1*time.Minute),map[string]string{"id": "rack_1", "vendor": "AWS"},"",)if err != nil {t.Fatalf("delete error: %v", err)}t.Log("delete ok")
}

http://www.xdnf.cn/news/20299.html

相关文章:

  • halcon-亚像素边缘提取教程
  • PyTorch 模型文件介绍
  • element-plus 表单校验-表单中包含多子组件表单的校验
  • (数据结构)哈希碰撞:线性探测法 vs 拉链法
  • 基于区块链的IoMT跨医院认证系统:Python实践分析
  • Flink中的事件时间、处理时间和摄入时间
  • Joplin-解决 Node.js 中 “digital envelope routines::unsupported“ 错误
  • 自旋锁/互斥锁 设备树 iic驱动总线 day66 67 68
  • 输入2.2V~16V 最高输出20V2.5A DCDC升压芯片MT3608L
  • 计算机网络:网络设备在OSI七层模型中的工作层次和传输协议
  • 鸿蒙 BLE 蓝牙智能设备固件升级之DFU升级方式(Nordic芯片)
  • macbook intel 打开cursor会闪退
  • MySQL集群高可用架构(MHA高可用架构)
  • Process Explorer进阶(第三章3.3):深入理解进程详情
  • [Windows] AdGuard.v7.21.5089.0 中文直装电脑版
  • cds序列转换为pepperl脚本详细解读及使用
  • Python多线程编程全面指南
  • web自动化测试
  • Elasticsearch优化从入门到精通
  • 线代:排列与逆序
  • 从机器学习的角度实现 excel 中趋势线:揭秘梯度下降过程
  • PageHelper的使用及底层原理
  • WordPress如何绑定多个域名 WordPress实现多域名访问
  • 新的打卡方式
  • GPIO介绍
  • java接口和抽象类有何区别
  • ICPC 2023 Nanjing R L 题 Elevator
  • 用Android studio运行海外极光推送engagelab安卓的SDK打apk安装包
  • Ribbon和LoadBalance-负载均衡
  • 从Java全栈到前端框架:一次真实面试的深度复盘