Flink实战项目——城市交通实时监控平台
文章目录
- Flink城市交通实时监控平台
- 1 项目整体介绍
- 1.1 项目架构
- 1.2 项目数据流
- 1.3 项目主要模块
- 2 数据采集
- logback日志配置
- 数据采集服务器接口
- 模拟实际交通数据
- 3 项目数据字典
- 3.1 卡口车辆采集数据
- 3.2 城市交通管理数据表
- 3.3 车辆轨迹数据表
- 4 实时卡口监控分析
- 4.1 flume采集日志数据写入kafka
- 4.2 实时车辆超速监控
- 4.3 实时卡口拥堵情况监控
- 4.4 实时最通畅的 TopN 卡口
- 5 智能实时报警
- 5.1 实时套牌分析
- 5.2 实时危险驾驶分析
- 5.3 违法王车辆轨迹跟踪
- 6 实时车辆布控
- 6.1 实时车辆分布情况
- 6.2 布隆过滤器(BloomFilter)
- 布隆过滤器的原理
- Trigger的作用
Flink城市交通实时监控平台
1 项目整体介绍
近几年来,随着国内经济的快速发展,高速公路建设步伐不断加快,全国机动车辆、驾驶员数量迅速增长,交通管理工作日益繁重,压力与日俱增。为了提高公安交通管理工作的科学化、现代化水平,缓解警力不足,加强和保障道路交通的安全、有序和畅通,减少道路交通违法和事故的发生,全国各地建设和使用了大量的“电子警察”、“高清卡口”、“固定式测速”、“区间测速”、“便携式测速”、“视频监控”、“预警系统”、“能见度天气监测系统”、“LED信息发布系统”等交通监控系统设备。尽管修建了大量的交通设施,增加了诸多前端监控设备,但交通拥挤阻塞、交通安全状况仍然十分严重。由于道路上交通监测设备种类和生产厂家繁多,目前还没有一个统一的数据采集和交换标准,无法对所有的设备、数据进行统一、高效的管理和应用,造成各种设备和管理软件混用的局面,给使用单位带来了很多不便,使得国家大量的基础建设投资未达到预期的效果。各交警支队的设备大都采用本地的数据库管理,交警总队无法看到各支队的监测设备及监测信息,严重影响到全省交通监测的宏观管理;目前网络状况为设备专网、互联网、公安网并存的复杂情况,需要充分考虑公安网的安全性,同时要保证数据的集中式管理;监控数据需要与“六合一”平台、全国机动车稽查布控系统等的数据对接,迫切需要一个全盘考虑面向交警交通行业的智慧交通管控指挥平台系统。
智慧交通管控指挥平台建成后,达到了以下效果目标:
- 交通监视和疏导:通过系统将监视区域内的现场图像传回指挥中心,使管理人员直接掌握车辆排队、堵塞、信号灯等交通状况,及时调整信号配时或通过其他手段来疏导交通,改变交通流的分布,以达到缓解交通堵塞的目的。
- 交通警卫:通过突发事件的跟踪,提高处置突发事件的能力。
- 建立公路事故、事件预警系统的指标体系及多类分析预警模型,实现对高速公路通行环境、交通运输对象、交通运输行为的综合分析和预警,建立真正意义上的分析及预警体系。
- 及时准确地掌握所监视路口、路段周围的车辆、行人的流量、交通治安情况等,为指挥人员提供迅速直观的信息从而对交通事故和交通堵塞做出准确判断并及时响应。
- 收集、处理各类公路网动静态交通安全信息,分析研判交通安全态势和事故隐患,并进行可视化展示和预警提示。
- 提供接口与其他平台信息共享和关联应用,基于各类动静态信息的大数据分析处理,实现交通违法信息的互联互通、源头监管等功能。
1.1 项目架构
本项目是与公安交通管理综合应用平台、机动车缉查布控系统等对接的,并且基于交通部门现有的数据平台上,进行的数据实时分析项目。
相关概念
- 卡口:道路上用于监控的某个点,可能是十字路口,也可能是高速出口等。
- 通道:每个卡口上有多个摄像头,每个摄像头有拍摄的方向,这些摄像头也叫通道。
- “违法王“车辆:该车辆违法未处理超过50次以上的车。
- 摄像头拍照识别:
- 一次拍照识别:经过卡口摄像头进行的识别,识别对象的车辆号牌信息、车辆号牌颜色信息等,基于车辆号牌和车辆颜色信息,能够实现基本的违法行为辨识、车辆黑白名单比对报警等功能。
- 二次拍照识别:可以通过时间差和距离自动计算出车辆的速度。
1.2 项目数据流
实时处理流程:Http请求 → 数据采集接口 → 数据目录 → flume监控目录(监控目录下的文件按照日期分区) → Kafka → Flink分析数据 → MySQL(给运营中心使用)。
1.3 项目主要模块
本项目的主要模块有三个方向:
- 实时卡口监控分析:依托卡口云管控平台达到降事故、保畅通、服务决策、引领实战的目的,最大限度指导交通管理工作。丰富了办案手段,提高了办案效率、节省警力资源,最终达到牵引警务模式的变革。利用摄像头拍摄的车辆数据来分析每个卡口车辆超速监控、卡口拥堵情况监控、每个区域卡口车流量TopN统计。
- 实时智能报警:该模块主要针对路口一些无法直接用单一摄像头拍摄违章的车辆,通过海量数据分析并实时智能报警。在一时间段内同时在 2 个区域出现的车辆记录则为可能为套牌车。这个模块包括:实时套牌分析,实时危险驾驶车辆分析。
- 智能车辆布控:该模块主要从整体上实时监控整个城市的车辆情况,并且对整个城市中出现“违法王”的车辆进行布控。主要功能包括:单一车辆轨迹跟踪布控,“违法王”轨迹跟踪布控,实时车辆分布分析,实时外地车分布分析。
2 数据采集
logback日志配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false"><!-- 定义日志文件的存储地址 --><property name="LOG_SYSTEM" value="logs/system"/><property name="LOG_COMMON" value="logs/common"/><!-- %m输出的信息, %p日志级别, %t线程名, %d日期, %c类的全名, %i索引 --><!-- appender是configuration的子节点,是负责写日志的组件 --><!-- 控制台输出 --><!-- ConsoleAppender把日志输出到控制台 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度,%msg:日志消息,%n是换行符--><pattern>%msg%n</pattern><!--<!– 控制台也要使用utf-8,不要使用gbk –>--><!--<charset>UTF-8</charset>--></encoder></appender><!-- 按照每天生成系统日志文件 --><appender name="SYSTEM_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><FileNamePattern>${LOG_SYSTEM}/system_%d{yyyy-MM-dd}_%i.log</FileNamePattern><MaxHistory>15</MaxHistory><maxFileSize>256MB</maxFileSize></rollingPolicy><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c - %msg%n</pattern></encoder></appender><!-- RollingFileAppender:滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 --><!-- 1.先按日期存日志,日期变了,将前一天的日志文件名重命名为xxx%日期%索引,新的日志仍然是xxx.log --><!-- 2.如果日期没有变化,但是当前日志文件的大小超过 256M 时,对当前日志进行分割 重名名 --><!-- 按照每天生成通用日志文件,单个文件超过256M则拆分 --><appender name="COMMON_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><!-- 活动文件的名字会根据fileNamePattern的值,每隔一段时间改变一次 --><FileNamePattern>${LOG_COMMON}/log_%d{yyyy-MM-dd}_%i.log</FileNamePattern><!-- 每产生一个日志文件,该日志文件的保存期限为15天 --><MaxHistory>15</MaxHistory><!-- maxFileSize:这是活动文件的大小,默认值是10MB,这里设置256M --><maxFileSize>256MB</maxFileSize></rollingPolicy><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!-- pattern节点,用来设置日志的输入格式 --><pattern>%msg%n</pattern><!--<!– 记录日志的编码 –>--><!--<charset>UTF-8</charset> <!– 此处设置字符集 –>--></encoder></appender><!-- 控制台日志输出级别 --><!-- 日志输出级别 --><root level="INFO"><!-- 启动name为“STDOUT”的日志级别,默认可以配置多个 --><appender-ref ref="STDOUT"/><appender-ref ref="SYSTEM_FILE"/></root><!-- 在哪些包下,日志输出的级别 --><!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 --><!-- 级别依次为【从高到低】:FATAL > ERROR > WARN > INFO > DEBUG > TRACE --><logger name="com.yw.datacollector.service.DataService" level="INFO" additivity="false"><appender-ref ref="STDOUT"/><appender-ref ref="COMMON_FILE"/></logger></configuration>
数据采集服务器接口
- DataController
@RestController
@RequestMapping("/api")
public class DataController {@Resourceprivate DataService dataService;@PostMapping("/sendData/{dataType}")public RestResponse<Void> collect(@PathVariable("dataType") String dataType,HttpServletRequest request) {dataService.process(dataType, request);return RestResponse.success();}
}
- DataService
@Slf4j
@Service
public class DataService {public void process(String dataType, HttpServletRequest request) {// 校验数据类型if (StringUtils.isBlank(dataType)) {throw new CustomException(RestResponseCode.LOG_TYPE_ERROR);}// 校验请求头中是否传入数据int contentLength = request.getContentLength();if (contentLength < 1) {throw new CustomException(RestResponseCode.REQUEST_CONTENT_INVALID);}// 从请求request中读取数据byte[] bytes = new byte[contentLength];try (BufferedInputStream bis = new BufferedInputStream(request.getInputStream())){// 最大尝试读取的次数int maxTryTimes = 100;int tryTimes = 0, totalReadLength = 0;while (totalReadLength < contentLength && tryTimes < maxTryTimes) {int readLength = bis.read(bytes, totalReadLength, contentLength - totalReadLength);if (readLength < 0) {throw new CustomException(RestResponseCode.BAD_NETWORK);}totalReadLength += readLength;if (totalReadLength == contentLength) {break;}tryTimes++;TimeUnit.MILLISECONDS.sleep(200);}if (totalReadLength < contentLength) {// 经过多处的读取,数据仍然没有读完throw new CustomException(RestResponseCode.BAD_NETWORK);}String jsonData = new String(bytes, StandardCharsets.UTF_8);log.info(jsonData);} catch (Exception e) {throw new CustomException(RestResponseCode.SYSTEM_ERROR, e.getMessage());}}
}
- Mock发送请求数据
public class SendDataToServer {public static void main(String[] args) throws Exception {String url = "http://localhost:8686/api/sendData/traffic_data";HttpClient client = HttpClients.createDefault();int i = 0;while (i < 20) {HttpPost post = new HttpPost(url);post.setHeader("Content-Type", "application/json");String data = "11,22,33,京P12345,57.2," + i;post.setEntity(new StringEntity(data, StandardCharsets.UTF_8));HttpResponse response = client.execute(post); //发送数据i++;TimeUnit.SECONDS.sleep(1);//响应的状态如果是200的话,获取服务器返回的内容if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {String result = EntityUtils.toString(response.getEntity(), "UTF-8");System.out.println(result);}}}
}
- 启动SpringBoot应用程序,并运行 SendDataToServer,可以看到设置的日志目录下生成的日志文件
模拟实际交通数据
public class CreateDatas {public static void main(String[] args) throws Exception {String url = "http://localhost:8686/api/sendData/traffic_data";HttpClient client = HttpClients.createDefault();String result = null;String[] locations = {"湘", "京", "京", "京", "京", "京", "京", "鲁", "皖", "鄂"};// 模拟生成一天的数据String day = "2025-06-21";// 初始化高斯分布的对象JDKRandomGenerator generator = new JDKRandomGenerator();generator.setSeed(10);GaussianRandomGenerator rg = new GaussianRandomGenerator(generator);Random r = new Random();// 模拟30000台车for (int i = 0; i < 30000; i++) {// 得到随机的车牌String car = locations[r.nextInt(locations.length)] + (char) (65 + r.nextInt(26)) + String.format("%05d", r.nextInt(100000));// 随机的小时String hour = String.format("%02d", r.nextInt(24));// 一天内,在一个城市里面,一辆车大概率经过30左右的卡口double v = rg.nextNormalizedDouble(); // 标准高斯分布的随机数据,大概率在-1 到 1int count = (int) (Math.abs(30 + 30 * v) + 1); // 一辆车一天内经过的卡口数量for (int j = 0; j < count; j++) {// 如果count超过30,为了数据更加真实,时间要1小时