Python开发后端InfluxDB数据库测试接口
1、使用PyCharm创建一个Python项目wzClear
2、新建package包wzInfluxdb和wzConfig包,如上图所示,新建一个DB.json配置文件并添加influxdb配置信息,DB.json为统一配置文件
{"influxdbV1": {"url": "http://192.168.0.44:8086","username": "root","password": "root","database": "wzClean","measurement": "HourData"}
}
3、搭建influxdb_util.py配置文件
import json
import os
import pandas as pd
from influxdb import InfluxDBClientclass InfluxDBUtil:def __init__(self, config_path='./wzConfig/DB.json'):self.client = Noneself.measurement = Noneself._load_config(config_path)self._connect()def _load_config(self, path):with open(path, 'r', encoding='utf-8') as f:config = json.load(f)influx = config['influxdbV1']self.host, self.port = influx['url'].replace("http://", "").split(":")self.username = influx['username']self.password = influx['password']self.database = influx['database']self.measurement = influx['measurement']def _connect(self):try:self.client = InfluxDBClient(host=self.host.strip(),port=int(self.port.strip()),username=self.username,password=self.password,database=self.database)self.log.info("InfluxDB connected.")except Exception as e:self.log.error(f"InfluxDB connection error: {e}")def insert(self, data: list):try:df = pd.DataFrame(data)if df.empty:self.log.warning("No data to insert.")returndf["time"] = pd.to_datetime(df["colTime"], format="%Y-%m-%d %H:%M:%S").dt.strftime("%Y-%m-%dT%H:%M:%SZ")df["colValue"] = df["colValue"].astype(str)df["isBreak"] = df["isBreak"].astype(str)df["proID"] = df["proID"].astype(str)df["varID"] = df["varID"].astype(str)json_body = df.apply(lambda row: {"measurement": self.measurement,"tags": {"proID": row["proID"],"varID": row["varID"]},"time": row["time"],"fields": {"colValue": row["colValue"],"isBreak": row["isBreak"]}}, axis=1).tolist()if self.client.write_points(json_body):self.log.info(f"{len(json_body)} points inserted into InfluxDB.")else:self.log.error("Failed to insert data.")except Exception as e:self.log.error(f"Insert error: {e}")def drop_measurement(self, measurement_name):"""删除表:param measurement_name::return:"""try:self.client.query(f'DROP MEASUREMENT "{measurement_name}"')except Exception as e:print(f"Drop measurement error: {e}")def query(self, influxql: str):"""使用查询数据库信息:param influxql::return:"""try:result = self.client.query(influxql)points = list(result.get_points())return pointsexcept Exception as e:return []def close(self):if self.client:self.client.close()
4、接口测试,新建一个app.py,使用flask进行测试
from flask import Flask, request, jsonify, send_from_directory
from wzInfluxdb.influxdb_util import InfluxDBUtilapp = Flask(__name__,static_folder=ROOT_DIR + '/static',static_url_path='/static',template_folder=ROOT_DIR + '/templates')
@app.route('/searchObject', methods=['POST', 'GET'])
def searchObject():now = datetime.now()# 计算前一天 00:00:00 和 当天 00:00:00,转为 UTCstart_dt = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)end_dt = now.replace(hour=0, minute=0, second=0, microsecond=0)# 转为 UTC ISO 格式start_time_utc = start_dt.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ')end_time_utc = end_dt.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ')# 构建 SQL 查询 单个对象ID的查询和绘制图像sql = f"SELECT amount,strTime FROM HourData WHERE equID = '111' AND varID = '30' AND time >= '{start_time_utc}' AND time < '{end_time_utc}' "influxdb = InfluxDBUtil()# data2 = influxdb.query_measurement(condition="equID='202012021658119701893f93cec9970'", limit=5)data2 = influxdb.query(sql)return jsonify({"result": data2 })except Exception as e:return jsonify({"error": str(e)}), 500