当前位置: 首页 > java >正文

神奇PG SQL数据库,配合简单代码就能巧妙实现复杂的功能

有一个物联网平台的设备故障告警功能需求,由于老旧系统的此项功能存在EVENT数据不准确误报,导致误报情况发生,在应该告警的时候没有发送告警数据或者是应该发送告警的时候却没有发送,希望通过我们改造解决误报情况,通过分析,对业务表(测点数据表),这张表大概有四百多台设备,每台设备有40个测点,比如温度,电流,电压,功率等。重点来了,LISTEN/NOTIFY功能,通过对这张表设置触发器(注意需要设置一定的条件),否则会导致严重的性能问题和带宽打满,因为这张表几乎每分钟都会在刷新,比如最新电流值,最后更新时间。如果每分钟有一万条数据在更新,订阅这个Channel的程序的处理能力不足的话,会导致Postgre SQL数据大量数据堆积,会导致本来的数据库功能被拖累,性能变慢。开始的第一版本就是没有设置条件,导致了性能问题,经过优化,设置了类似这样的第二版本触发器代码:

老版本:

create function notify_asset_update() returns triggerlanguage plpgsql
as
$$
BEGIN-- 仅推送更新事件(不维护状态)PERFORM pg_notify('asset_update',json_build_object('asset', NEW.assetnumber,'variablename', NEW.variablename,'updatetime', NEW.updatetime)::text);
RETURN NEW;
END;
$$;
CREATE TRIGGER asset_update_trigger
AFTER INSERT OR UPDATE OR DELETE ON photovoltaicel
FOR EACH ROW EXECUTE FUNCTION notify_asset_update();
alter function notify_asset_update() owner to postgres;

新版:

CREATE OR REPLACE FUNCTION notify_asset_update()
RETURNS trigger AS $$
BEGIN-- 仅在更新操作且 variablename 为 solarOperationState 时推送通知IF TG_OP = 'UPDATE' AND NEW.variablename = 'solarOperationState' THENPERFORM pg_notify('asset_update',json_build_object('asset', NEW.assetnumber,'variablename', NEW.variablename,'updatetime', NEW.updatetime)::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;-- 创建触发器
CREATE TRIGGER asset_update_triggerAFTER UPDATE ON photovoltaicelFOR EACH ROWWHEN (NEW.variablename = 'solarOperationState')EXECUTE FUNCTION notify_asset_update();-- 修改函数所有者
ALTER FUNCTION notify_asset_update() OWNER TO postgres;

代码可以分为两部分:创建函数,把函数创建到触发器上(绑定表)。

下面说到了应用程序了,把数据的表按照关心的逻辑线把数据初始化到应用服务器的内存里,

private void loadInitialData() { String sql = "SELECT assetnumber, variablename, MAX(updatetime) as max_time " + "FROM photovoltaicel where updatetime is not null " + "GROUP BY assetnumber, variablename"; try (Connection conn = getConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) { while (rs.next()) { long asset = rs.getLong("assetnumber"); String varName = rs.getString("variablename"); Instant time = rs.getTimestamp("max_time").toInstant(); cache.computeIfAbsent(asset, k -> new ConcurrentHashMap<>()) .put(varName, time); } } catch (SQLException e) { throw new RuntimeException(e); } // 初始化所有资产状态为离线(后续立即检测) cache.keySet().forEach(asset -> statusCache.put(asset, false)); }

需要注意的是,这里最好不要使用数据库连接池,因为数据库连接池不适应这个场景(不是Web接口,接口一般都是快速访问数据库,然后在很短时间内释放连接返回到连接池里),所以推荐使用原生的JDBCDriverManager来获得独立连接即可, 非要使用连接池,需要把空闲检测idleTimeout设置为0, 否则会被连接池回收导致程序错误。

 private void startListener() {new Thread(() -> {try {Connection listenConnection = getConnection();listenConnection.setAutoCommit(true); // 必须设置为 true,否则无法接收通知listenConnection.createStatement().execute("LISTEN "+deviceOnOffChannel);while (!Thread.interrupted()) {PGNotification[] notifs = listenConnection.unwrap(PGConnection.class).getNotifications(10000);//如果10000毫秒内没有新的通知,getNotifications()方法将返回一个空数组。if (notifs != null) {for (PGNotification n : notifs) {handleNotification(n.getParameter());}}}} catch (SQLException e) {throw new RuntimeException(e);}}).start();}

上面这段代码的核心逻辑是,只要PG数据库有数据更新,就会更新本地缓存,这样相当于把设备的测点表在内存里做了一份镜像。剩下就是定时去检测这份缓存了,比如30秒检测一次,如果一个设备的点表数据2分钟都没有更新,就标记成设备离线,可以发送Message给MQ中间件。或者直接调用SMS通道,阿里云的短信服务接口。推荐使用MQ暂存一下,好处多多,比如可以异步发送,或者做定制,或者免打扰。

private void startTimer() {ScheduledExecutorService scheduler1 = Executors.newSingleThreadScheduledExecutor();scheduler1.scheduleAtFixedRate(() -> {Instant cutoff = Instant.now().minus(2, ChronoUnit.MINUTES);cache.forEach((asset, varMap) -> {boolean allExpired = varMap.values().stream().allMatch(time -> time.isBefore(cutoff));Boolean wasOnline = statusCache.get(asset);if (allExpired && Boolean.TRUE.equals(wasOnline)) {statusCache.put(asset, false);sendNotification(asset, "OffLine");}});}, 0, 30, TimeUnit.SECONDS);
}// 在线状态即时检查
private void checkOnlineStatus(long asset) {ConcurrentHashMap<String, Instant> varMap = cache.get(asset);if (varMap == null) return;Instant cutoff = Instant.now().minus(2, ChronoUnit.MINUTES);boolean anyRecent = varMap.values().stream().allMatch(time -> time.isAfter(cutoff));Boolean wasOnline = statusCache.get(asset);if (anyRecent && Boolean.FALSE.equals(wasOnline)) {statusCache.put(asset, true);sendNotification(asset, "OnLine");}
}

http://www.xdnf.cn/news/1837.html

相关文章:

  • 专家系统的知识获取、检测与组织管理——基于《人工智能原理与方法》的深度解析
  • 别学了,打会王者吧
  • tcp 和http 网络知识
  • 七、web自动化测试03
  • 大模型时代的深度学习框架
  • C语言里位操作的应用
  • 前端让一个div的高度为屏幕的高度减去其他所有元素的高度(包括它们的margin和padding),并自适应。
  • Python笔记:VS2013编译Python-3.5.10
  • 芯岭技术XL32F003单片机 32位Cortex M0+ MCU简单介绍 性能优异
  • 面向智能家居安全的异常行为识别与应急联动关键技术研究与系统实现(源码+论文+部署讲解等)
  • 软考【网络工程师】2023年5月上午题答案解析
  • dedecms织梦arclist标签noflag属性过滤多个参数
  • 2025年GPLT团体程序设计天梯赛L1-L2
  • RPCRT4!NdrPointerUnmarshall函数之ADVAPI32!LsarQueryInformationPolicy函数调用的一个例子
  • 【ESP32-IDF笔记】20-配置以太网网络(W5500)
  • 杂项知识点
  • 基于python代码的通过爬虫方式实现快手发布视频(2025年4月)
  • 模式识别的局限和确认偏误消除偏见
  • LeetCode 每日一题 2799. 统计完全子数组的数目
  • 项目笔记2:post请求是什么,还有什么请求
  • Uni-App 多端电子合同开源项目介绍
  • 单精度浮点运算/定点运算下 MATLAB (VS) VIVADO
  • Excalidraw工具分享
  • 速成GO访问sql,个人笔记
  • CodeMeter Runtime 安装失败排查与解决指南
  • 蓝牙调试助手APP波形图版
  • 软件工程效率优化:一个分层解耦与熵减驱动的系统框架
  • java配置
  • mysql知识总结 索引篇
  • Flutter Dart中的类 对象