线程池配置实现多线程快速处理批量数据
把 ArrayList 替换成线程安全的集合类,例如 CopyOnWriteArrayList。
CopyOnWriteArrayList 在进行写操作时会创建一个新的数组副本,以此保证线程安全。
否则会报错:
java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 11
表明在代码里有尝试访问数组中索引为 11 的元素,然而数组的长度小于 12(因为数组索引从 0 开始)。错误栈显示异常出现在 java.util.ArrayList.add 方法处,这可能意味着在往 ArrayList 里添加元素时,底层数组的索引越界了。
原因:多线程修改 ArrayList:ArrayList 并非线程安全的类。多个线程同时对 ArrayList 进行修改操作,就可能会造成数组越界异常。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CopyOnWriteArrayList;public class VendorAdminServiceImpl {private ExecutorService executorService = Executors.newFixedThreadPool(10);public void pageSearchList() {List<Future<?>> futures = new ArrayList<>();//统计执行时长StopWatch sw = StopWatch.createStarted();// 使用 CopyOnWriteArrayList 代替 ArrayListList<String> resultList = new CopyOnWriteArrayList<>();// 模拟任务List<String> dataList = new ArrayList<>();for (int i = 0; i < 100; i++) {dataList.add("Data " + i);}AtomicInteger counter = new AtomicInteger(0);dataList.forEach(data -> {futures.add(executorService.submit(() -> {// 模拟处理逻辑resultList.add(data);counter.incrementAndGet();}));});try {// 阻塞当前线程,直到更新操作完成// 等待所有更新操作完成for (Future<?> future : futures) {try {future.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();throw new RuntimeException("页面数据查询失败", e);}}} catch (Exception e) {e.printStackTrace();} finally {// 关闭线程池executorService.shutdown();}sw.stop();log.info("数据组装耗时:{}", sw.getTime());System.out.println("Total processed: " + counter.get());System.out.println("Result list size: " + resultList.size());//可以return resultList;}public static void main(String[] args) {VendorAdminServiceImpl service = new VendorAdminServiceImpl();service.pageSearchList();}
}