List ToMap优化优化再优化到极致
请对以下代码进行优化!
看着是不是很简单?你是不是日常也会这么写?
当前使用的是java21,
//1-统计blockName出现的次数
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();stockList.parallelStream().forEach(st -> {String blockName = st.getBlock_name();if (map.containsKey(blockName)) {map.put(blockName, map.get(blockName) + 1);} else {map.put(blockName, 1);}});
//2-保存blockName出现的次数
stockList.parallelStream().forEach(st -> {String blockName = st.getBlock_name();st.setFinance_type(String.valueOf(map.get(blockName)));
});
优化一
ConcurrentHashMap
和 parallelStream
来统计 stockList
中每个 blockName
的出现次数,并将统计结果设置到每个 stock
对象的 finance_type
属性中。虽然代码功能正确,但存在以下优化空间:
-
重复遍历问题:代码使用了两个
parallelStream().forEach
分别处理计数和赋值,增加了开销。可以合并为一次遍历。 -
线程安全改进:
ConcurrentHashMap
的containsKey
和get
组合不是原子操作,可能导致并发问题。可以使用compute
或merge
方法来确保线程安全。 -
性能优化:
parallelStream
在小数据集上可能引入不必要的线程开销,建议根据数据规模选择是否并行。 -
代码简洁性:可以使用
Collectors.groupingBy
简化计数逻辑。
以下是优化后的代码:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 统计 blockName 出现次数
stockList.parallelStream().collect(Collectors.groupingBy(Stock::getBlock_name,Collectors.counting())).forEach((blockName, count) -> map.put(blockName, count.intValue()));// 设置 finance_type
stockList.parallelStream().forEach(st -> st.setFinance_type(String.valueOf(map.get(st.getBlock_name()))));
优化点说明:
-
单次统计:使用
Collectors.groupingBy
和Collectors.counting
一次性完成blockName
的计数,替代手动forEach
和put
操作。 -
线程安全:
collect
操作是线程安全的,生成的Map
结果直接写入ConcurrentHashMap
,避免了containsKey
和get
的非原子操作。 -
代码更简洁:
groupingBy
是函数式编程风格,逻辑更清晰,减少了手动维护map
的代码。 -
保留并行处理:继续使用
parallelStream
保持并发性能,适合大数据量场景。
进一步优化建议:
-
并行与串行选择:如果
stockList
数据量较小(例如几百条),parallelStream
的线程切换成本可能高于单线程处理,建议测试切换为stream()
的性能。 -
合并操作:如果
finance_type
只依赖于计数结果,可以进一步合并计数和赋值逻辑为单次遍历,但需要权衡代码可读性。例如:
Map<String, Long> countMap = stockList.parallelStream().collect(Collectors.groupingBy(Stock::getBlock_name,Collectors.counting()));
stockList.forEach(st -> st.setFinance_type(String.valueOf(countMap.get(st.getBlock_name()))));
这里去掉了 ConcurrentHashMap
,因为计数后的 Map
只读不写,无需并发支持。
-
异常处理:确保
getBlock_name
不会返回null
,否则需添加空值检查以避免NullPointerException
。 -
性能测试:建议对优化后的代码进行性能测试,确认在你的实际数据规模下是否提升了效率。
注意事项:
-
如果
stockList
非常大,parallelStream
能有效利用多核 CPU,但需确保Stock
对象的getBlock_name
和setFinance_type
方法线程安全。 -
如果
finance_type
的值仅为字符串化的计数,考虑直接在Stock
类中维护计数逻辑,减少Map
的使用(视业务需求而定)。
优化二
针对上百万条数据的场景,性能和线程安全是关键。以下是针对你的需求(ConcurrentHashMap
统计 blockName
出现次数并设置 finance_type
)进一步优化的代码,重点提升性能和确保绝对线程安全,同时保持简洁性。
优化目标
-
性能最佳:减少遍历次数,优化并行处理,降低锁竞争。
-
线程安全:确保所有操作在高并发下无竞争或数据不一致。
-
适用大数量:针对百万级数据,充分利用多核 CPU,减少内存开销。
优化后的代码
ConcurrentMap<String, Long> collect1 = stockList.parallelStream().filter(st -> st.getBlock_name() != null).collect(Collectors.groupingByConcurrent(Stock::getBlock_name, Collectors.counting()));stockList.parallelStream().forEach(st -> {st.setFinance_type(String.valueOf(map.get(st.getBlock_name())));});
优化点详解
-
使用
**groupingByConcurrent**
:-
Collectors.groupingByConcurrent
专为并行流设计,内部使用ConcurrentHashMap
存储结果,确保计数过程线程安全。 -
相比普通
groupingBy
,它减少了合并阶段的开销,适合百万级数据的高并发场景。 -
直接生成计数结果,省去手动维护
ConcurrentHashMap
的步骤。
-
-
单次计数 + 单次赋值:
-
第一次
parallelStream
完成所有blockName
的计数,生成countMap
。 -
第二次
parallelStream
遍历stockList
,将计数结果设置到finance_type
。 -
虽然仍是两次遍历,但每一步都是高度并行的,且避免了在计数阶段对
ConcurrentHashMap
的竞争性写入。
-
-
线程安全保证:
-
groupingByConcurrent
确保计数过程无竞争。 -
countMap.get
是只读操作,ConcurrentHashMap
的读操作天然线程安全,无需额外同步。 -
假设
Stock.setFinance_type
是简单的字段赋值(无复杂逻辑),则线程安全无问题。如果setFinance_type
包含复杂逻辑,需确保其线程安全。
-
-
性能优化:
-
并行流:
parallelStream
充分利用多核 CPU,适合百万级数据。Java 21 的 Fork/Join 框架会根据 CPU 核心数动态分配任务。 -
避免锁竞争:
groupingByConcurrent
内部优化了ConcurrentHashMap
的分段锁,减少并发写入的瓶颈。 -
内存效率:
countMap
只存储每个blockName
的计数,内存占用可控。
-
终极优化
针对你的需求(百万级数据、所有 finance_type
基于最终计数、性能最佳且线程安全),且考虑到系统支持异步框架(如 Java 21 的虚拟线程或 Reactor),以下是使用 CompletableFuture
和虚拟线程优化的最终代码方案。相比之前的 groupingByConcurrent
方案,异步框架可以进一步提升吞吐量,尤其在高并发场景下。代码确保 finance_type
基于最终计数值
最终优化代码(基于 CompletableFuture 和虚拟线程)
·
//使用虚拟线程执行器Executor virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();//异步计数CompletableFuture<ConcurrentMap<String, Long>> concurrentMapCompletableFuture = CompletableFuture.supplyAsync(() -> stockList.parallelStream().filter(st -> st.getBlock_name() != null).collect(Collectors.groupingByConcurrent(Stock::getBlock_name,Collectors.counting())), virtualThreadExecutor);//异步赋值CompletableFuture<Void> voidCompletableFuture = concurrentMapCompletableFuture.thenAcceptAsync(countMap -> stockList.parallelStream().filter(st -> st.getBlock_name() != null).forEach(st -> st.setFinance_type(String.valueOf(countMap.get(st.getBlock_name())))), virtualThreadExecutor);//wait for allvoidCompletableFuture.join();
优化说明
-
虚拟线程(Java 21):
-
使用
Executors.newVirtualThreadPerTaskExecutor()
创建虚拟线程执行器。虚拟线程是 Java 21 的轻量级线程,创建和切换成本极低,适合高并发任务。 -
虚拟线程允许为每个任务分配独立线程,最大化并发性,特别适合 I/O 密集或阻塞操作较少的场景(如你的计数和赋值逻辑)。
-
-
CompletableFuture 异步处理:
-
将计数和赋值分为两个异步阶段:
-
countFuture
:异步执行groupingByConcurrent
,生成最终计数Map
。 -
assignFuture
:在计数完成后异步执行finance_type
赋值。
-
-
thenAcceptAsync
确保赋值阶段等待计数完成,保证finance_type
基于最终计数值。 -
异步执行减少主线程阻塞,提高吞吐量。
-
-
线程安全:
-
groupingByConcurrent
使用ConcurrentHashMap
,计数过程线程安全。 -
赋值阶段只读
countMap
,无竞争,Stock.setFinance_type
假设为简单字段赋值(线程安全)。 -
虚拟线程隔离任务,降低线程切换开销。
-
-
性能优化:
-
并行流 + 虚拟线程:
parallelStream
结合虚拟线程充分利用多核 CPU,适合百万级数据。 -
异步流水线:
CompletableFuture
将计数和赋值解耦,允许系统在等待计数完成时调度其他任务。 -
低开销:虚拟线程的轻量级特性减少上下文切换成本,相比传统线程池更高效。
-
-
空值处理:
-
添加
filter(st -> st.getBlock_name() != null)
防止blockName
为null
导致异常。
-
为什么优于之前的方案
-
相比
**groupingByConcurrent**
**(同步并行流)**:-
虚拟线程减少线程池管理的开销,适合高并发任务。
-
CompletableFuture
的异步执行允许更灵活的任务调度,可能在多任务环境中提升整体吞吐量。
-
-
相比
AtomicInteger
** 单次遍历**:-
确保
finance_type
基于最终计数值(如"44444"
),符合你的业务需求。 -
避免了
AtomicInteger
的 CAS 竞争问题,性能更稳定。
-
最后执行结果比较
怎么样,结果是不是出乎意料。
java8如何切换到java21?
已经都设置java21了,但是还报错
java: 找不到符号 符号: 方法 newVirtualThreadPerTaskExecutor() 位置: 类 java.util.concurrent
原因 没有完全设置
1-idea里设置。
2-pom.xml里再继续设置。