Java线程池多线程查询数据库提高查询效率
- 需求
 - 问题
 - 思路
 - 代码
 - 解析
 
需求
公司数据统计报表查询,几张大表关联查询,包含跨库查询,数据联查,数据过滤,数据统计。
问题
| 主表数据1 | 主表数据2 | … | 统计数据1 | 统计数据2 | … | 
|---|---|---|---|---|---|
| 数据 | 数据 | 数据 | 数据 | 
报表结构是主表数据+统计数据,只查主表数据速度很快,统计数据需要跨库联查,且表数据量大,拖慢了速度。
一开始的做法是,每次分页请求都先获取主表list,再foreach主表list,根据主表字段去查询统计数据并进行计算,再合并到主表进行返回,查询速度不理想。
分页查响应时间=1次主表查询时间+10次统计数据查询时间
思路
复杂的报表数据统计不应全部由DB层面去解决,而是SQL仅负责数据过滤,返回统计所需的字段,SQL尽量简单高效,service层拿到DB返回的结果集,由代码层面去进行较为复杂的数据合并与统计。
最总给到前端的是一个分页,那么优化的话是基于分页去进行,分页10条,线程池开启10个线程去并行查询,最总汇总返回给前端。
分页查响应时间=1次主表查询时间+1次统计数据查询时间(10条中最慢的一条)
代码
线程池工具类
package com.youxue.weliao.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * @Author lhy
 * @Date 2022/5/13
 */
@Slf4j
public class ThreadUtil {
    private volatile static ThreadUtil threadUtil;
    private ThreadPoolExecutor executor;
    /**
     * 单例
     */
    private ThreadUtil() {
    }
    public static ThreadUtil getThreadUtilInstance() {
        if (null == threadUtil) {
            synchronized (ThreadUtil.class) {
                if (null == threadUtil) {
                    threadUtil = new ThreadUtil();
                }
            }
        }
        return threadUtil;
    }
    /**
     * 提交任务
     *
     * @param task
     */
    public Future<?> submit(Runnable task) {
        if (executor == null) {
            // 初始化线程池
            executor = initialize();
        }
        // 执行线程
        return executor.submit(task);
    }
    /**
     * 初始化线程池
     *
     * @return
     */
    private synchronized ThreadPoolExecutor initialize() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder()
                .setNameFormat("task-admin-getlist--%d")
                .build(),
            new ThreadPoolExecutor.CallerRunsPolicy());
        log.info("===================>ThreadUtil线程池初始化");
        return executor;
    }
    /**
     * 关闭线程池
     */
    public void shutdown() {
        if (executor != null) {
            executor.shutdown();
        }
    }
}
service层实际调用,这里我就不把全部业务代码放出来了,简化后如下
    @Override
    public IPage<ManageDto> getManageDto(IPage<ManageDto> page, String str, String end, List<Integer> taskAdminIds) {
    	// 主表查询
        IPage<ManageDto> manageDtos = baseMapper.getManageDto(page, str, end, taskAdminIds);
        if (manageDtos.getTotal() > 0) {
    		// 主表查询结果集
            List<ManageDto> manageDtoList = manageDtos.getRecords();
            // 异步线程Future集合
            List<Future<?>> futures = new ArrayList<>();
            for (ManageDto manageDto : manageDtoList) {
            	// 本页有几条数据便开启几条线程去进行统计数据查询
            	futures.add(ThreadUtil.getThreadUtilInstance().submit(() -> {
	        		// 统计数据查询
	        		List<ManageVo> manageVos = groupCyberArmyService.getManageVos(manageDto.getProcessAdminUserId(), manageDto.getTaskAdminId());
	        		// 业务逻辑运算后合并到主表Dto
            	}))
            }
            this.waitFinish(futures);
            manageDtos.setRecords(manageDtoList);
        	return manageDtos;
        }
        return null;
    }
    
    /**
     * 线程池内线程是否已全部执行结束
     * 
     * @param futures 异步线程Future集合
     */
    @SneakyThrows
    private void waitFinish(List<Future<?>> futures) {
        for (Future<?> future : futures) {
            future.get();
        }
    }
解析
future.get方法
线程池线程是异步提交的,但是返回分页结果是需要同步返回,Future的get是个阻塞方法。只有所有的任务全部完成,我们才能用get按照任务的提交顺序依次返回结果,调用future.get()方法查看线程池内所有方法是否已执行完成,达到线程异步提交,结果集同步返回的效果。
线程池工具类
单例就不多解释了,这里用的是DCL单例,线程池根据自身需求配置核心线程等参数。
相关文章
暂无评论...
