今天主要是新增了后端的线程池。利用线程池来限制BI项目用户生成AI图表的频率。也就是异步化业务
异步化任务:
通过异步操作让需要等待时间长的请求可以异步去完成。比如智能生成图表,如果是同步要等Ai返回则需要等上好久,用户就不能去做其他事情了。此时需要异步化:当用户提交需求时,直接返回已经收到任务结果。但是任务是否正在做需要后端自行解决。
好处:
优化了用户的体验(不需要一直等待)。
对于频繁的请求会有限制。(如果没有限制可能会有的问题,那么如果同时来了10几个或者上百个任务,那么调用AI接口会导致第三方API平台直接拒绝你的访问)。
业务流程
用户发送异步请求->后端->保存到数据库->发送到队列->能处理直接处理返回结果->不能处理但能入队(没有超过队伍数量最大值)然后等待处理返回结果->不能入队其他方法(生成失败);
使用线程池来完成异步化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Configuration public class ThreadPoolExecutorConfig { @Bean public ThreadPoolExecutor threadPoolExecutor(){ ThreadFactory threadFactory = new ThreadFactory() { private int count = 1; @Override public Thread newThread(@NotNull Runnable r) { Thread thread = new Thread(r); thread.setName("线程"+count); count++; return thread; } };
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4),threadFactory);
return threadPoolExecutor; } }
|

这里使用的时java自带的线程池。给他注册上@bean 并配置
ThreadPoolExecutor()中四个参数分别为:核心线程数量,最大线程数量,线程存活时间,时间单位,工作队列,消息工厂。还有一个拒绝策略的参数
核心线程数量:
相当于正式员工,有活了都是先给他们干。
最大线程数量:
相当于临时工,只有忙不过来的时候才招募(队列满了)
线程存活时间:
当用上了最大线程数量时等待请求数量减少到不需要这些临时工时就给他删去。也就是零时工多久没干活了就给它删了
工作队列:
队列存储着要等待的任务,所以要设置队列大小。
消息工厂:
负责控制每个线程的生成,这里自己新写了一个
拒绝策略:
如果队列满了被拒绝要进行的操作
流程:
当任务进入这个线程池 如果 此时核心数量还有空闲那么直接分配给核心。 如果核心数量满了则进入工作队列。如果工作也满了那么就去申请额外的线程保证总数量不超过最大线程数量。要是最大线程数量也满了那么就被拒绝。
一般情况下,任务分为 10 密集型和计算密集型两种。计算密集型:吃 CPU,比如音视频处理、图像处理、数学计算等,一般是设置 corePoolSize 为CPU 的核数+1(空余线程),可以让每个线程都能利用好 CPU 的每个核,而且线程之间不用频繁切换(减少打架、减少开销)
10 密集型:吃带宽/内存/硬盘的读写资源,corePoolSize 可以设置大一点,一般经验值是 2n 左右,但是建议以 10 的能力为主。
—by yupi
测试接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @RestController @RequestMapping("/queue") @Slf4j @Profile({"dev","local"}) public class QueueController {
@Resource private ThreadPoolExecutor threadPoolExecutor;
@GetMapping("/add") public void add(String name) { CompletableFuture.runAsync(() -> { log.info("任务执行中:" + name + ",执行人:" + Thread.currentThread().getName()); try { Thread.sleep(60000); } catch (InterruptedException e) { e.printStackTrace(); } }, threadPoolExecutor); }
@GetMapping("/get") public String get() { Map<String, Object> map = new HashMap<>(); int size = threadPoolExecutor.getQueue().size(); map.put("队列长度", size); long taskCount = threadPoolExecutor.getTaskCount(); map.put("任务总数", taskCount); long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); map.put("已完成任务数", completedTaskCount); int activeCount = threadPoolExecutor.getActiveCount(); map.put("正在工作的线程数", activeCount); return JSONUtil.toJsonStr(map); } }
|

具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| @Override public DeepSeekResponseVO genChartByAiAsync(MultipartFile multipartFile, GenChartByAiRequest genChartByAiRequest, User loginUser) {
String name = genChartByAiRequest.getName(); String goal = genChartByAiRequest.getGoal(); String chartType = genChartByAiRequest.getChartType(); StringBuilder userInput = new StringBuilder();
String userGoal = goal; if(StringUtils.isNotBlank(chartType)){ userGoal+="请使用"+chartType; } userInput.append("分析需求:").append(userGoal).append("\n"); String csvData = ExcelUtils.excelToCsv(multipartFile); userInput.append("原始目标:").append(csvData).append("\n");
Chart chart = new Chart(); chart.setGoal(goal); chart.setName(name); chart.setChartType(chartType);
chart.setUserId(loginUser.getId()); chart.setStatus("wait");
boolean saveResult = this.save(chart); ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");
String chartDataTableSql = ChartTableUtils.ChartDataToTableSql(csvData,chart.getId()); ThrowUtils.throwIf(SqlUtils.validSortField(chartDataTableSql),ErrorCode.PARAMS_ERROR,"创建chartData表非法"); chartMapper.createChartDataTable(chartDataTableSql);
String chartDataValueSql = ChartTableUtils.ChartDataInsertTableSql(csvData,chart.getId()); ThrowUtils.throwIf(SqlUtils.validSortField(chartDataValueSql),ErrorCode.PARAMS_ERROR,"插入chartData数据非法"); chartMapper.insertChartData(chartDataValueSql);
CompletableFuture.runAsync(()->{ Chart updatingChart = new Chart(); updatingChart.setId(chart.getId()); updatingChart.setStatus("running"); boolean updateStatusResult = this.updateById(updatingChart); if(!updateStatusResult){ handleChartUpdateError(chart.getId(),"更新图表执行中状态失败"); } String result = deepSeekService.callDeepSeek(userInput.toString()); String[] split = result.split("【【【【【"); if(split.length<3){ handleChartUpdateError(chart.getId(),"AI 生成错误"); } String genChart = split[1].trim(); String genResult = split[2].trim(); Chart updateChartResult = new Chart(); updateChartResult.setId(chart.getId()); updateChartResult.setStatus("succeed"); updateChartResult.setGenChart(genChart); updateChartResult.setGenResult(genResult);
boolean updateResult = this.updateById(updateChartResult);
if(!updateResult){ handleChartUpdateError(chart.getId(),"更新图表成功状态失败"); } }); DeepSeekResponseVO deepSeekResponseVO = new DeepSeekResponseVO(); deepSeekResponseVO.setChartId(chart.getId());
return deepSeekResponseVO; }
public void handleChartUpdateError(long chartId,String execMessage){ Chart updateChart = new Chart(); updateChart.setId(chartId); updateChart.setExecMessage(execMessage); updateChart.setStatus("failed"); boolean result = this.updateById(updateChart); ThrowUtils.throwIf(!result,ErrorCode.SYSTEM_ERROR,"保存Chart执行消息失败"); if(!result){ log.error("更新图表失败 "+chartId+ " "+execMessage); } }
|

TODO:
\1. 当线程空闲时应该把被拒绝的请求完成。
\2. 超时控制,增加一个超时时间,如果超过这个时间则标记为失败。
3.反向压力:(通过AI服务当前任务队列数来控制核心数量)最大化利用资源反向压力 - 知乎