当前位置: 首页 > news >正文

SpringBatch+Mysql+hanlp简版智能搜索

        资源条件有限,需要支持智搜的数据量也不大,上es搜索有点大材小用了,只好写个简版mysql的智搜,处理全文搜素,支持拼音搜索,中文分词,自定义分词断词,地图范围搜索,周边搜索,自定义多边形搜索。

      通过设置定时SpringBatch抽取需要进行检索的表,并将检索的标题,概要,内容等存入检索表中。通过检索表进行全文检索。

代码主要包:通过网盘分享的文件:mysql智搜.zip
链接: https://pan.baidu.com/s/1MMhmyVD8o56Grp1Aa5IiKQ 提取码: 0530

1、提供的接口

@RestController
@Tag(name = "智搜全局搜索API")
@RequestMapping(path = "/smart/serach")
public class SearchIndexController {@Resourceprivate SearchIndexService searchIndexService;@PostMapping("/smartSearch")@Operation(summary = "智搜数据查询")@Log(title = "一张图智搜数据查询", businessType = BusinessType.QUERY)public Response<Page<SearchIndexDTO>> getSearchIndexDataByPage(@RequestBody SearchIndexParamDTO searchIndexParam) {return Response.with(searchIndexService.getSearchIndexDataByPage(searchIndexParam));}@PostMapping("/getSearchIndexByPage")@Operation(summary = "普通智搜内容分页查询")@Log(title = "普通智搜内容分页查询", businessType = BusinessType.QUERY)public Response<Page<SearchIndexDTO>> getSearchIndexByPage(@RequestBody SearchIndexParamDTO searchIndexParam) {return Response.with(searchIndexService.getSearchIndexByPage(searchIndexParam));}@PostMapping("/getSearchIndexById")@Operation(summary = "通过主键获取检索详细信息")@Log(title = "普通智搜内容分页查询", businessType = BusinessType.QUERY)public Response<SearchIndexDTO> getSearchIndexById(@RequestBody SearchIndexParamDTO searchIndexParam) {return Response.with(searchIndexService.getSearchIndexById(searchIndexParam));}@PostMapping("/smartSearchRiver")@Operation(summary = "智搜溯源河流数据查询")@Log(title = "一张图智搜溯源河流数据查询", businessType = BusinessType.QUERY)public Response<String> getSmartSearchRiver(@RequestBody SearchPointParamDTO searchIndexParam) {return Response.with(searchIndexService.getSmartSearchRiver(searchIndexParam));}}

2、service

public interface SearchIndexService extends IService<SearchIndex> {Page<SearchIndexDTO> getSearchIndexByPage(SearchIndexParamDTO searchIndexParamDTO);SearchIndexDTO getSearchIndexById(SearchIndexParamDTO searchIndexParamDTO);Page<SearchIndexDTO> getSearchIndexDataByPage(SearchIndexParamDTO searchIndexParamDTO);String getSmartSearchRiver(SearchPointParamDTO searchIndexParam);}
@Service
@Slf4j
@RequiredArgsConstructor
@DS("master")
public class SearchIndexServiceImpl extends ServiceImpl<SearchIndexMapper, SearchIndex> implements SearchIndexService {@Resourceprivate SearchIndexMapper searchIndexMapper;@Resourceprivate DataSourceService dataSourceService;@Resourceprivate DynamicDataSourcesService dynamicDataSourcesService;@Overridepublic Page<SearchIndexDTO> getSearchIndexByPage(SearchIndexParamDTO searchIndexParamDTO) {// 处理拼音查询if (searchIndexParamDTO.getSearchContent() != null) {boolean isPingyin = KeyWordUtils.isContainsPinyin(searchIndexParamDTO.getSearchContent());if (isPingyin) {searchIndexParamDTO.setSearchContent(KeyWordUtils.getPinyin(searchIndexParamDTO.getSearchContent(), CommonConstant.SPACE));} else {searchIndexParamDTO.setSearchContent(HanLpAdvancedUtil.extractKeywords(searchIndexParamDTO.getSearchContent()));}}// 处理自定义区域搜索// 从GeoJSON中提取多边形坐标if (ObjectUtil.isNotEmpty(searchIndexParamDTO.getGeoJson())) {List<List<Double>> polygonCoordinates = GeoJsonUtil.extractPolygonCoordinates(searchIndexParamDTO.getGeoJson());// 转换为WKT格式String polygonWKT = GeoJsonUtil.convertGeoJsonToWKT(polygonCoordinates);searchIndexParamDTO.setPolygonWKT(polygonWKT);}return searchIndexMapper.getSearchIndexByPage(searchIndexParamDTO.build(), searchIndexParamDTO);}@Overridepublic SearchIndexDTO getSearchIndexById(SearchIndexParamDTO searchIndexParamDTO) {// 处理拼音查询if (StrUtil.isEmpty(searchIndexParamDTO.getPkId())) {throw new CoreException(ErrorCodeEnum.SYS_ERROR, "查询参数缺失");}// 查找指定ID数据SearchIndex searchIndex = searchIndexMapper.selectById(searchIndexParamDTO.getPkId());// 构造来源数据查询sqlString sql = "select * from " + searchIndex.getEntityTable() + " where  " + searchIndex.getEntityKeyColumn() + " = ?";// 获取表结构信息DataSourceDTO dataSourceDTO = dataSourceService.findById(searchIndex.getEntitySourceId());List<Column> columnList  = dynamicDataSourcesService.getDbColumnList(dataSourceDTO, searchIndex.getEntityTable());List<Object> jdbcParamValues = new ArrayList<>();jdbcParamValues.add(searchIndex.getEntityId());List<Map<String, Object>> listInfo = dynamicDataSourcesService.executeListSql(dataSourceDTO, sql, jdbcParamValues);SearchIndexDTO searchIndexDTO = new SearchIndexDTO();BeanUtilCopy.copyProperties(searchIndex, searchIndexDTO);searchIndexDTO.setColumnList(columnList);searchIndexDTO.setDetailInfo(CollUtil.isNotEmpty(listInfo)? listInfo.get(0) : null);return searchIndexDTO;}@Overridepublic Page<SearchIndexDTO> getSearchIndexDataByPage(SearchIndexParamDTO searchIndexParamDTO) {// 处理拼音查询if (searchIndexParamDTO.getSearchContent() != null) {boolean isPingyin = KeyWordUtils.isContainsPinyin(searchIndexParamDTO.getSearchContent());if (isPingyin) {searchIndexParamDTO.setSearchContent(KeyWordUtils.getPinyin(HanLpAdvancedUtil.extractKeywords(searchIndexParamDTO.getSearchContent()), CommonConstant.SPACE));} else {searchIndexParamDTO.setSearchContent(HanLpAdvancedUtil.segmentToString(searchIndexParamDTO.getSearchContent(), Boolean.TRUE));}}// 处理自定义区域搜索// 从GeoJSON中提取多边形坐标if (ObjectUtil.isNotEmpty(searchIndexParamDTO.getGeoJson())) {List<List<Double>> polygonCoordinates = GeoJsonUtil.extractPolygonCoordinates(searchIndexParamDTO.getGeoJson());// 转换为WKT格式String polygonWKT = GeoJsonUtil.convertGeoJsonToWKT(polygonCoordinates);searchIndexParamDTO.setPolygonWKT(polygonWKT);}if (CollUtil.isEmpty(searchIndexParamDTO.getEntityTypes())) {List<String> entityTypes = Arrays.asList("3","4","5","6","7");searchIndexParamDTO.setEntityTypes(entityTypes);}return searchIndexMapper.getSearchIndexByPage(searchIndexParamDTO.build(), searchIndexParamDTO);}/*** 获取河流子集合** @param searchIndexParam* @return*/@Overridepublic String getSmartSearchRiver(SearchPointParamDTO searchIndexParam) {if (searchIndexParam.getMaxDistanceKm() == null) {searchIndexParam.setMaxDistanceKm(5D);}return GeoJsonUtil.extractLineSubset(searchIndexParam.getLongitudeStart(), searchIndexParam.getLatitudeStart(), searchIndexParam.getLongitudeEnd(), searchIndexParam.getLatitudeEnd(), searchIndexParam.getMaxDistanceKm());//return JSON.toJSONString(LineStringSubsetExtractor.extractSubsetBetweenPoints(searchIndexParam.getLongitudeStart(), searchIndexParam.getLatitudeStart(), searchIndexParam.getLongitudeEnd(), searchIndexParam.getLatitudeEnd(), searchIndexParam.getMaxDistanceKm()));}
}

3、数据库表设计

数据库表
DROP TABLE IF EXISTS `ads_search_index_table`;
CREATE TABLE `ads_search_index_table`  (`pk_id` int NOT NULL AUTO_INCREMENT COMMENT '主键',`entity_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '分类',`entity_type_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '分类名称',`entity_key_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '主键字段',`entity_table` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '实体对应的表名',`entity_table_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表名中文',`entity_source_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体所在数据源,目前只支持当前库',`title_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '标题抽入字段,逗号分割',`sumary_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '摘要信息字段,逗号分割',`content_column` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '内容content需要抽入的字段英文逗号分割',`keywords_default` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '关键词默认值',`keywords_column` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '关键词keywords要抽入的字段英文逗号分割',`weight_default` int NULL DEFAULT NULL COMMENT '默认权重',`update_flag` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '1' COMMENT '是否更新1是0否',`lng_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '经度字段',`lat_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '纬度字段',`delete_column` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '逻辑删除字段',`region_code_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划编码',`file_url_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '文件地址字段',`deploy_time_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据发布时间字段',`source_from` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据来源',`revision` int NULL DEFAULT 0 COMMENT '乐观锁',`created_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建人',`created_time` datetime NULL DEFAULT NULL COMMENT '创建时间',`updated_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新人',`updated_time` datetime NULL DEFAULT NULL COMMENT '更新时间',`deleted` int NULL DEFAULT 0 COMMENT '删除标志:0-未删除;1-已删除',PRIMARY KEY (`pk_id`) USING BTREE,FULLTEXT INDEX `ft_search`(`title_column`, `keywords_column`) WITH PARSER `ngram`
) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局搜索表数据来源信息' ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of ads_search_index_table
-- ----------------------------
INSERT INTO `ads_search_index_table` VALUES (8, '8', '新闻', 'pk_id', 'test_environmental_news', '新闻表', '1', 'news_title', 'news_sumary', 'news_content', '新闻', 'news_title,news_content', 80, '0', '', '', '', 'region_code', 'attachment_url', 'publish_time', '某某单位', 19, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);
INSERT INTO `ads_search_index_table` VALUES (9, '9', '数据', 'PK_ID', 'ads_t_app_interface', '某接口表', '1', 'interface_name', 'interface_name', 'interface_url', '某接口', 'interface_name', 60, '0', NULL, NULL, '', NULL, NULL, 'created_time', '某信息中心', 5, NULL, NULL, 'xxxx@qq.com', '2025-05-20 17:08:28', 0);
INSERT INTO `ads_search_index_table` VALUES (10, '10', '文件', 'pk_id', 'test_environmental_news', '新闻表', '1', 'news_title', 'news_sumary', 'news_content', '文件', 'news_title,news_content', 60, '0', '', '', '', 'region_code', 'attachment_url', 'publish_time', '某某单位', 6, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);
INSERT INTO `ads_search_index_table` VALUES (11, '11', '地图', 'id', 'test_dust_emission_source', '扬尘源', '1', 'project_name', 'project_name', 'project_name', '地图', 'project_name', 60, '0', 'longitude', 'latitude', '', 'region_code', '', 'create_time', '某某单位', 5, NULL, NULL, 'xxx@qq.com', '2025-05-20 17:08:28', 0);
INSERT INTO `ads_search_index_table` VALUES (12, '12', '应用', 'app_id', 'ads_t_sys_app', '应用', '1', 'name', 'name', 'description', '应用', 'name', 60, '0', '', '', '', '', '', 'created_time', '某某单位', 7, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);
INSERT INTO `ads_search_index_table` VALUES (13, '13', '公告', 'pk_id', 'test_environmental_announcement', '公告表', '1', 'title', 'summary', 'content', '公告', 'title,content', 60, '0', '', '', '', 'region_code', 'attachment_url', 'publish_time', '某某单位', 5, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);DROP TABLE IF EXISTS `ads_search_index`;
CREATE TABLE `ads_search_index`  (`pk_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '主键',`entity_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '分类',`entity_type_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '分类名称',`entity_key_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '主键字段',`entity_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '对应实体表的主键',`entity_table` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表名',`entity_table_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表中文名',`entity_source_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体所在数据源',`title` varchar(300) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '标题',`sumary` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '摘要',`content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '内容',`keywords` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '关键词或标签',`pingyin_keywords` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '拼音分词',`weight` int NULL DEFAULT NULL COMMENT '权重或排序',`lng` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '经度',`lat` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '纬度',`region_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划编码',`region_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划名称',`file_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '文件地址逗号分割',`deploy_time` datetime NULL DEFAULT NULL COMMENT '数据来源发布时间',`source_from` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据来源',`revision` int NULL DEFAULT 0 COMMENT '乐观锁',`created_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建人',`created_time` datetime NULL DEFAULT NULL COMMENT '创建时间',`updated_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新人',`updated_time` datetime NULL DEFAULT NULL COMMENT '更新时间',`deleted` int NULL DEFAULT 0 COMMENT '删除标志:0-未删除;1-已删除',`location_point` point NULL,PRIMARY KEY (`pk_id`) USING BTREE,INDEX `idx_search_index`(`entity_type`, `entity_table`, `deleted`) USING BTREE,FULLTEXT INDEX `ft_search`(`title`, `content`, `keywords`, `pingyin_keywords`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局搜索表' ROW_FORMAT = DYNAMIC;

4、SpringBatch抽取数据

依赖<dependency><groupId>org.springframework.batch</groupId><artifactId>spring-batch-core</artifactId><version>4.3.10</version></dependency>
<dependency><groupId>com.belerweb</groupId><artifactId>pinyin4j</artifactId><version>2.5.1</version></dependency><!--地图--><dependency><groupId>org.geotools</groupId><artifactId>gt-shapefile</artifactId><version>${geotools.version}</version><exclusions><exclusion><groupId>org.geotools</groupId><artifactId>gt-main</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-main</artifactId><version>${geotools.version}</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-geojson</artifactId><version>${geotools.version}</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-swing</artifactId><version>${geotools.version}</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-epsg-hsql</artifactId><version>${geotools.version}</version> <!-- 与项目中其他GeoTools版本一致 --></dependency><!-- JTS几何库 --><dependency><groupId>org.locationtech.jts</groupId><artifactId>jts-core</artifactId><version>1.18.1</version></dependency><!-- Reactor Netty --><dependency><groupId>io.projectreactor.netty</groupId><artifactId>reactor-netty-http</artifactId><version>1.0.39</version></dependency><!-- IK 分词器--><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId><version>2012_u6</version></dependency><dependency><groupId>com.hankcs</groupId><artifactId>hanlp</artifactId><version>portable-1.8.4</version></dependency>
分词hanlp.properties配置
# HanLP 根路径
#root=D:/tools/hanlp/
root=/data/hanlp/# 核心词典
CoreDictionaryPath=data/dictionary/CoreNatureDictionary.txt# 自定义词典路径
CustomDictionaryPath=data/dictionary/custom/CustomDictionary.txt;data/dictionary/custom/hanyucidianDic.txt;data/dictionary/custom/siteDic.txt# 断词词典路径
CoreStopWordDictionaryPath=data/dictionary/stopwords.txt# 分词线程数据
SegmentThreadNumber=4# 是否显示词性标注信息
ShowTermNature=false# 日志级别
HanLPLogLevel=WARN
SpringBatch配置
application.yxmlSpringbatch:job:enabled: false   #启动时不启动jobjdbc:initialize-schema: always#是否定时任务执行scheduler:enabled: truesql:init:schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql// 定时任务
@Component
public class ScheduledBatchTask {/** 创建索引任务*/@Resource@Qualifier("searchIndexCreateTaskJob")private Job searchIndexCreateTaskJob;@ResourceJobLauncher jobLauncher;@Value("${spring.batch.scheduler.enabled:false}")private boolean schedulerEnabled;/*** 每日03点执行批量更新*/@Scheduled(cron = "0 0 3 * * ?")public void searchIndexCreateTask() throws Exception {if (schedulerEnabled) {JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();JobExecution run = jobLauncher.run(searchIndexCreateTaskJob, jobParameters);run.getId();}}}// 1. 主配置类调整(按表分区)
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;// 线程池配置(核心线程数=表数量)@Bean("batchTaskExecutor")public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setThreadNamePrefix("table-processor-");return executor;}/*** 更新搜索索引任务** @return*/@Bean("searchIndexCreateTaskJob")public Job searchIndexCreateTaskJob(@Qualifier("deleteOldDataStep") Step deleteOldDataStep,@Qualifier("masterSearchStep") Step masterStep) {return jobBuilderFactory.get("searchIndexCreateTaskJob").start(deleteOldDataStep).next(masterStep).listener(new BatchSearchJobListener()).build();}@Beanpublic Step deleteOldDataStep(@Qualifier("deleteTasklet") Tasklet deleteTasklet) {return stepBuilderFactory.get("deleteOldDataStep").tasklet(deleteTasklet).build();}/*** 索引表更新主任务步骤** @return*/@Bean("masterSearchStep")public Step masterStep(@Qualifier("updateSearchIndexData") Step updateSearchIndexData,@Qualifier("multiIndexTablePartitioner") MultiIndexTablePartitioner multiIndexTablePartitioner) {return stepBuilderFactory.get("masterSearchStep").partitioner(updateSearchIndexData.getName(), multiIndexTablePartitioner) // 分区器按表名分区一个表一个分区.step(updateSearchIndexData).gridSize(10) // 按表分区了 并发数一般设置为核心数.taskExecutor(batchSearchTaskExecutor()).build();}// 索引线程池配置(核心线程数=表数量)@Bean("batchSearchTaskExecutor")public TaskExecutor batchSearchTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setThreadNamePrefix("search-processor-");return executor;}/*** 处理分页数据更新步骤* @return*/@Bean("updateSearchIndexData")public Step updateSearchIndexData(@Qualifier("searchTableReader") MyBatisPagingItemReader<Map<String, Object>> myBatisPagingItemReader,@Qualifier("batchSearchIndexWriter") BatchSearchIndexWriter batchSearchIndexWriter,@Qualifier("searchIndexProcessor") SearchIndexProcessor searchIndexProcessor) {return stepBuilderFactory.get("updateSearchIndexData").<Map<String, Object>, Map<String, Object>>chunk(100).reader(myBatisPagingItemReader).processor(searchIndexProcessor).writer(batchSearchIndexWriter).faultTolerant().skipPolicy(new AlwaysSkipItemSkipPolicy()).build();}/*** 分页获取需要处理的智搜数据的表数据* @return*/@Bean("searchTableReader")@StepScopepublic MyBatisPagingItemReader<Map<String, Object>> searchTableReader(@Value("#{stepExecutionContext['entityType']}") String entityType, //分类@Value("#{stepExecutionContext['entityTypeName']}") String entityTypeName, //分类@Value("#{stepExecutionContext['entityKeyColumn']}") String entityKeyColumn,// 主键字段@Value("#{stepExecutionContext['entityTable']}") String entityTable,// 表名@Value("#{stepExecutionContext['entityTableName']}") String entityTableName,// 表名@Value("#{stepExecutionContext['entitySourceId']}") String entitySourceId,// 表数据来源@Value("#{stepExecutionContext['titleColumn']}") String titleColumn, // 标题字段@Value("#{stepExecutionContext['sumaryColumn']}") String sumaryColumn, // 摘要字段@Value("#{stepExecutionContext['deployTimeColumn']}") String deployTimeColumn, // 发布时间字段@Value("#{stepExecutionContext['contentColumn']}") String contentColumn, // 内容字段@Value("#{stepExecutionContext['sourceFrom']}") String sourceFrom, // 数据来源@Value("#{stepExecutionContext['keywordsDefault']}") String keywordsDefault, // 默认关键词@Value("#{stepExecutionContext['keywordsColumn']}") String keywordsColumn, // 关键词字段@Value("#{stepExecutionContext['fileUrlColumn']}") String fileUrlColumn, // 关键词字段@Value("#{stepExecutionContext['weightDefault']}") String weightDefault, // 默认权重@Value("#{stepExecutionContext['deleteColumn']}") String deleteColumn, // 逻辑删除字段@Value("#{stepExecutionContext['lngColumn']}") String lngColumn, // 逻辑删除字段@Value("#{stepExecutionContext['latColumn']}") String latColumn, // 逻辑删除字段@Value("#{stepExecutionContext['regionCodeColumn']}") String regionCodeColumn, // 新区区划字段@Value("#{stepExecutionContext['updateFlag']}") String updateFlag // 是否需要更新) {MyBatisPagingItemReader<Map<String, Object>> reader = new MyBatisPagingItemReader<>();reader.setSqlSessionFactory(sqlSessionFactory);reader.setQueryId("com.bigdatacd.panorama.system.mapper.SearchIndexTableMapper.selectSearchIndexTableDataByPage");Map<String,Object> param = new HashMap<>();param.put(CommonConstant.ENTITY_TYPE,entityType);param.put(CommonConstant.ENTITY_TYPE_NAME,entityTypeName);param.put(CommonConstant.ENTITY_KEY_COLUMN,entityKeyColumn);param.put(CommonConstant.ENTITY_TABLE,entityTable);param.put(CommonConstant.ENTITY_TABLE_NAME,entityTableName);param.put(CommonConstant.SOURCE_FROM,sourceFrom);param.put(CommonConstant.DEPLOY_TIME_COLUMN,deployTimeColumn);param.put(CommonConstant.FILE_URL_COLUMN,fileUrlColumn);param.put(CommonConstant.SUMARY_COLUMN,sumaryColumn);param.put(CommonConstant.ENTITY_SOURCE_ID,entitySourceId);param.put(CommonConstant.TITLE_COLUMN,titleColumn);param.put(CommonConstant.CONTET_COLUMN,contentColumn);param.put(CommonConstant.KEYWORDS_DEFAULT,keywordsDefault);param.put(CommonConstant.KEYWORDS_COLUMN,keywordsColumn);param.put(CommonConstant.WEIGHT_DEFAULT,weightDefault);param.put(CommonConstant.UPDATE_FLAG,updateFlag);param.put(CommonConstant.DELETE_COLUMN,deleteColumn);param.put(CommonConstant.LNG_COLUMN,lngColumn);param.put(CommonConstant.LAT_COLUMN,latColumn);param.put(CommonConstant.REGION_CODE_COLUMN,regionCodeColumn);reader.setParameterValues(param);reader.setPageSize(1000);return reader;}}// 过程监听
@Component
public class BatchSearchJobListener implements JobExecutionListener {private long beingTime;private long endTime;@Overridepublic void beforeJob(JobExecution jobExecution) {beingTime = System.currentTimeMillis();System.out.println(jobExecution.getJobInstance().getJobName() + "  beforeJob...... " + beingTime);}@Overridepublic void afterJob(JobExecution jobExecution) {endTime = System.currentTimeMillis();System.out.println(jobExecution.getJobInstance().getJobName() + "一共耗耗时:【" + (endTime - beingTime) + "】毫秒");}}// 构造智搜数据
@Component("batchSearchIndexWriter")
@StepScope
public class BatchSearchIndexWriter implements ItemWriter<Map<String, Object>> {@Autowiredprivate NamedParameterJdbcTemplate jdbcTemplate;@Overridepublic void write(List<? extends Map<String, Object>> items) {//如果需要更新数据则删除后重新插入if (CollUtil.isNotEmpty(items)) {try {StringBuilder insertSql = new StringBuilder();// 构造插入语句insertSql.append("insert into ads_search_index(").append("pk_id,entity_type,entity_type_name,entity_key_column,entity_id,entity_source_id,entity_table,entity_table_name,title,sumary,file_url,source_from,deploy_time, ").append("content,keywords,pingyin_keywords,weight,lng,lat,region_code,region_name,created_time,updated_time)").append(" values ").append("(:pkId,:entityType,:entityTypeName,:entityKeyColumn,:entityId,:entitySourceId,:entityTable,:entityTableName,:title,:sumary,:fileUrl,:sourceFrom,:deployTime, ").append(":content,:keywords,:pingyinKeywords,:weight,:lng,:lat,:regionCode,:regionName,:createdTime,:updatedTime)");jdbcTemplate.batchUpdate(insertSql.toString(), items.stream().map(item -> new MapSqlParameterSource().addValue(CommonConstant.PK_ID, UUID.randomUUID().toString().replaceAll("-", CommonConstant.NULL_STR).toUpperCase()).addValue(CommonConstant.ENTITY_TYPE, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TYPE))? item.get(CommonConstant.ENTITY_TYPE) : CommonConstant.NULL_STR).addValue(CommonConstant.ENTITY_TYPE_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TYPE_NAME))? item.get(CommonConstant.ENTITY_TYPE_NAME) : CommonConstant.NULL_STR).addValue(CommonConstant.ENTITY_KEY_COLUMN, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_KEY_COLUMN))? item.get(CommonConstant.ENTITY_KEY_COLUMN) : CommonConstant.NULL_STR).addValue(CommonConstant.ENTITY_ID, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_ID))? item.get(CommonConstant.ENTITY_ID) : CommonConstant.NULL_STR).addValue(CommonConstant.ENTITY_SOURCE_ID, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_SOURCE_ID))? item.get(CommonConstant.ENTITY_SOURCE_ID) : CommonConstant.NULL_STR).addValue(CommonConstant.ENTITY_TABLE, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TABLE))? item.get(CommonConstant.ENTITY_TABLE) : CommonConstant.NULL_STR).addValue(CommonConstant.ENTITY_TABLE_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TABLE_NAME))? item.get(CommonConstant.ENTITY_TABLE_NAME) : CommonConstant.NULL_STR).addValue(CommonConstant.TITLE, ObjectUtil.isNotEmpty(item.get(CommonConstant.TITLE))? item.get(CommonConstant.TITLE) : CommonConstant.NULL_STR).addValue(CommonConstant.SUMARY, ObjectUtil.isNotEmpty(item.get(CommonConstant.SUMARY))? item.get(CommonConstant.SUMARY) : CommonConstant.NULL_STR).addValue(CommonConstant.FILE_URL, ObjectUtil.isNotEmpty(item.get(CommonConstant.FILE_URL))? item.get(CommonConstant.FILE_URL) : CommonConstant.NULL_STR).addValue(CommonConstant.SOURCE_FROM, ObjectUtil.isNotEmpty(item.get(CommonConstant.SOURCE_FROM))? item.get(CommonConstant.SOURCE_FROM) : CommonConstant.NULL_STR).addValue(CommonConstant.DEPLOY_TIME, ObjectUtil.isNotEmpty(item.get(CommonConstant.DEPLOY_TIME))? item.get(CommonConstant.DEPLOY_TIME) : null).addValue(CommonConstant.CONTENT, ObjectUtil.isNotEmpty(item.get(CommonConstant.CONTENT))? item.get(CommonConstant.CONTENT) : CommonConstant.NULL_STR).addValue(CommonConstant.KEYWORDS, ObjectUtil.isNotEmpty(item.get(CommonConstant.KEYWORDS))? item.get(CommonConstant.KEYWORDS): CommonConstant.NULL_STR).addValue(CommonConstant.PINGYIN_KEYWORDS, ObjectUtil.isNotEmpty(item.get(CommonConstant.PINGYIN_KEYWORDS))? item.get(CommonConstant.PINGYIN_KEYWORDS) : CommonConstant.NULL_STR).addValue(CommonConstant.WEIGHT, ObjectUtil.isNotEmpty(item.get(CommonConstant.WEIGHT_DEFAULT))? item.get(CommonConstant.WEIGHT_DEFAULT) : 100)// 判读经纬的范围.addValue(CommonConstant.LNG, ObjectUtil.isNotEmpty(item.get(CommonConstant.LNG))&& LatLonValidator.isLongitudeValid(item.get(CommonConstant.LNG).toString())? item.get(CommonConstant.LNG) : CommonConstant.ZERO).addValue(CommonConstant.LAT, ObjectUtil.isNotEmpty(item.get(CommonConstant.LAT)) && LatLonValidator.isLatitudeValid(item.get(CommonConstant.LAT).toString())? item.get(CommonConstant.LAT) : CommonConstant.ZERO).addValue(CommonConstant.REGION_CODE, ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_CODE))? item.get(CommonConstant.REGION_CODE) : CommonConstant.NULL_STR).addValue(CommonConstant.REGION_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_NAME))? item.get(CommonConstant.REGION_NAME) : CommonConstant.NULL_STR).addValue(CommonConstant.CREATED_TIME, DateUtil.date()).addValue(CommonConstant.UPDATED_TIME, DateUtil.date())).toArray(SqlParameterSource[]::new));} catch (Exception e) {e.printStackTrace();throw new CoreException(ErrorCodeEnum.SYS_ERROR, e.getMessage());}}}
}
// 简单处理数据同步问题直接删除再添加数据量不大
@Component("deleteTasklet")
@StepScope
public class DeleteTasklet implements Tasklet {@Autowiredprivate NamedParameterJdbcTemplate jdbcTemplate;@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {StringBuilder sqlBuilder = new StringBuilder();sqlBuilder.append("SELECT ");sqlBuilder.append("pk_id as pkId,");sqlBuilder.append("entity_type as entityType,");sqlBuilder.append("entity_type_name as entityTypeName,");sqlBuilder.append("entity_key_column as entityKeyColumn,");sqlBuilder.append("entity_table as entityTable,");sqlBuilder.append("entity_source_id as entitySourceId,");sqlBuilder.append("title_column as titleColumn,");sqlBuilder.append("content_column as contentColumn,");sqlBuilder.append("keywords_default as keywordsDefault,");sqlBuilder.append("keywords_column as keywordsColumn,");sqlBuilder.append("update_flag as updateFlag,");sqlBuilder.append("delete_column as deleteColumn,");sqlBuilder.append("lng_column as lngColumn,");sqlBuilder.append("lat_column as latColumn,");sqlBuilder.append("region_code_column as regionCodeColumn,");sqlBuilder.append("weight_default as weightDefault");sqlBuilder.append(" FROM ads_search_index_table where deleted =0 and update_flag ='1' ");List<Map<String, Object>> tables = jdbcTemplate.queryForList(sqlBuilder.toString(), new MapSqlParameterSource());String delSql = "update ads_search_index set deleted = '1' where deleted = '0' and entity_table = :entityTable and entity_type = :entityType";for (int i = 0; i < tables.size(); i++) {// 删除智搜索引数据jdbcTemplate.update(delSql, new MapSqlParameterSource().addValue(CommonConstant.ENTITY_TABLE, tables.get(i).get(CommonConstant.ENTITY_TABLE)).addValue(CommonConstant.ENTITY_TYPE, tables.get(i).get(CommonConstant.ENTITY_TYPE)));}return RepeatStatus.FINISHED;}
}/*** 获取需要更新搜素索引的表*/
@Component
@Slf4j
public class MultiIndexTablePartitioner implements Partitioner {private final DataSource dataSource;public MultiIndexTablePartitioner(DataSource dataSource) {this.dataSource = dataSource;}@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);StringBuilder sqlBuilder = new StringBuilder();sqlBuilder.append("SELECT ");sqlBuilder.append("pk_id as pkId,");sqlBuilder.append("entity_type as entityType,");sqlBuilder.append("entity_type_name as entityTypeName,");sqlBuilder.append("entity_key_column as entityKeyColumn,");sqlBuilder.append("entity_table as entityTable,");sqlBuilder.append("entity_table_name as entityTableName,");sqlBuilder.append("entity_source_id as entitySourceId,");sqlBuilder.append("title_column as titleColumn,");sqlBuilder.append("sumary_column as sumaryColumn,");sqlBuilder.append("file_url_column as fileUrlColumn,");sqlBuilder.append("deploy_time_column as deployTimeColumn,");sqlBuilder.append("source_from as sourceFrom,");sqlBuilder.append("content_column as contentColumn,");sqlBuilder.append("keywords_default as keywordsDefault,");sqlBuilder.append("keywords_column as keywordsColumn,");sqlBuilder.append("update_flag as updateFlag,");sqlBuilder.append("delete_column as deleteColumn,");sqlBuilder.append("lng_column as lngColumn,");sqlBuilder.append("lat_column as latColumn,");sqlBuilder.append("region_code_column as regionCodeColumn,");sqlBuilder.append("weight_default as weightDefault");sqlBuilder.append(" FROM ads_search_index_table where deleted =0 and update_flag ='1' ");List<Map<String,Object>> tables = jdbcTemplate.queryForList(sqlBuilder.toString());// 删除智搜索引数据Map<String, ExecutionContext> partitions = new HashMap<>();for (int i = 0; i < tables.size(); i++) {ExecutionContext ctx = new ExecutionContext();// 将需要传递的参数放到上下文中,用于动态批量更新的sql用ctx.putString(CommonConstant.PK_ID, String.valueOf(tables.get(i).get(CommonConstant.PK_ID)));ctx.putString(CommonConstant.ENTITY_TYPE, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TYPE)));ctx.putString(CommonConstant.ENTITY_TYPE_NAME, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TYPE_NAME)));ctx.putString(CommonConstant.ENTITY_KEY_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_KEY_COLUMN)));ctx.putString(CommonConstant.ENTITY_TABLE, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TABLE)));ctx.putString(CommonConstant.ENTITY_TABLE_NAME, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TABLE_NAME)));ctx.putString(CommonConstant.ENTITY_SOURCE_ID, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_SOURCE_ID)));ctx.putString(CommonConstant.TITLE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.TITLE_COLUMN)));ctx.putString(CommonConstant.SUMARY_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.SUMARY_COLUMN)));ctx.putString(CommonConstant.FILE_URL_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.FILE_URL_COLUMN)));ctx.putString(CommonConstant.DEPLOY_TIME_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.DEPLOY_TIME_COLUMN)));ctx.putString(CommonConstant.SOURCE_FROM, String.valueOf(tables.get(i).get(CommonConstant.SOURCE_FROM)));ctx.putString(CommonConstant.CONTET_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.CONTET_COLUMN)));ctx.putString(CommonConstant.KEYWORDS_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.KEYWORDS_COLUMN)));ctx.putString(CommonConstant.KEYWORDS_DEFAULT, String.valueOf(tables.get(i).get(CommonConstant.KEYWORDS_DEFAULT)));ctx.putString(CommonConstant.WEIGHT_DEFAULT, String.valueOf(tables.get(i).get(CommonConstant.WEIGHT_DEFAULT)));ctx.putString(CommonConstant.UPDATE_FLAG, String.valueOf(tables.get(i).get(CommonConstant.UPDATE_FLAG)));ctx.putString(CommonConstant.DELETE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.DELETE_COLUMN)));ctx.putString(CommonConstant.LNG_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.LNG_COLUMN)));ctx.putString(CommonConstant.LAT_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.LAT_COLUMN)));ctx.putString(CommonConstant.REGION_CODE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.REGION_CODE_COLUMN)));partitions.put("partition" + i, ctx);}return partitions;}
}
// 处理索引数据分词
@Component("searchIndexProcessor")
@Builder
public class SearchIndexProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {@Autowiredprivate SysRegionService sysRegionService;@Overridepublic Map<String, Object> process(Map<String, Object> item) {// 标题内容关键字分词item.put(CommonConstant.TITLE, item.get(CommonConstant.TITLE).toString());item.put(CommonConstant.SUMARY, ObjectUtil.isNotEmpty(item.get(CommonConstant.SUMARY))?item.get(CommonConstant.SUMARY).toString():CommonConstant.NULL_STR);item.put(CommonConstant.CONTENT, HanLpAdvancedUtil.segmentToString(item.get(CommonConstant.CONTENT).toString()));item.put(CommonConstant.KEYWORDS, HanLpAdvancedUtil.extractKeywords(item.get(CommonConstant.KEYWORDS).toString()));item.put(CommonConstant.PINGYIN_KEYWORDS, KeyWordUtils.getPinyin(item.get(CommonConstant.KEYWORDS).toString(), CommonConstant.SPACE));// 处理行政区划if (ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_CODE))) {// 不满12位的补全12位if (item.get(CommonConstant.REGION_CODE).toString().length() < CommonConstant.TWELVE_INT) {item.put(CommonConstant.REGION_CODE, StrUtil.padAfter(item.get(CommonConstant.REGION_CODE).toString(), CommonConstant.TWELVE_INT, CommonConstant.ZERO));}if (ObjectUtil.isNotEmpty(sysRegionService.getRegionNameByCode(item.get(CommonConstant.REGION_CODE).toString()))) {item.put(CommonConstant.REGION_NAME, sysRegionService.getRegionNameByCode(item.get(CommonConstant.REGION_CODE).toString()));} else {item.put(CommonConstant.REGION_CODE, CommonConstant.NULL_STR);item.put(CommonConstant.REGION_NAME, CommonConstant.NULL_STR);}}return item;}
}

效果图

http://www.xdnf.cn/news/733933.html

相关文章:

  • matlab计算转子系统的固有频率、振型、不平衡响应
  • StringBuilder对象的操作
  • cocos creator资源管理器,资源动态加载和释放
  • 基于Qt封装数据库基本增删改查操作,支持多线程,并实现SQLite数据库单例访问
  • 【google 论文】Titans: Learning to Memorize at Test Time
  • 裂缝仪在线监测装置:工程安全领域的“实时守卫者”
  • DrissionPage WebPage模式:动态交互与高效爬取的完美平衡术
  • C# 将HTML文档、HTML字符串转换为图片
  • Window10+ 安装 go环境
  • 深入探索:基于 Nacos 的配置管理之动态配置与环境管理
  • Lifecycle原理
  • 低秩矩阵、奇异值矩阵和正交矩阵
  • 【FlashRAG】本地部署与demo运行(一)
  • ArcGIS应用指南:基于网格与OD成本矩阵的交通可达性分析
  • AI时代的园区网变革:“极简”行至最深处,以太彩光恰自来
  • 【C++】位图
  • 前端pointer-events属性
  • 显卡3080和4060哪个强 两款游戏性能对比
  • 重拾Scrapy框架
  • Clish中xml文件配置的使用方法
  • Spring Cloud Alibaba 学习 —— 简单了解常用技术栈
  • 【专题】神经网络期末复习资料(题库)
  • 二、Python提供了丰富的内置工具,无需额外安装即可使用
  • 6个月Python学习计划 Day 9 - 函数进阶用法
  • 【ROS2实体机械臂驱动】rokae xCoreSDK Python测试使用
  • 单卡4090部署Qwen3-32B-AWQ(4bit量化)-vllm
  • 网易 - 灵犀办公文档
  • const ‘不可变’到底是值不变还是地址不变
  • Python使用
  • C 语言中 * count++ 引发的错误与正确指针操作解析