今天主要是新增了后端的线程池。利用线程池来限制BI项目用户生成AI图表的频率。也就是异步化业务

异步化任务:

​ 通过异步操作让需要等待时间长的请求可以异步去完成。比如智能生成图表,如果是同步要等Ai返回则需要等上好久,用户就不能去做其他事情了。此时需要异步化:当用户提交需求时,直接返回已经收到任务结果。但是任务是否正在做需要后端自行解决。

好处:

​ 优化了用户的体验(不需要一直等待)。

​ 对于频繁的请求会有限制。(如果没有限制可能会有的问题,那么如果同时来了10几个或者上百个任务,那么调用AI接口会导致第三方API平台直接拒绝你的访问)。

业务流程

用户发送异步请求->后端->保存到数据库->发送到队列->能处理直接处理返回结果->不能处理但能入队(没有超过队伍数量最大值)然后等待处理返回结果->不能入队其他方法(生成失败);

使用线程池来完成异步化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class ThreadPoolExecutorConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(){
ThreadFactory threadFactory = new ThreadFactory() {
//初始线程数
private int count = 1;
//当线程池需要创建线程时就会调用newThread方法
@Override
public Thread newThread(@NotNull Runnable r) {
//创建一个线程
Thread thread = new Thread(r);
//设置线程名字
thread.setName("线程"+count);
//线程数量增加
count++;
return thread;
}
};
//创建一个新的线程池
/*
* @Author: ZH
* 核心线程数量,最大线程数量,线程存活时间,时间单位,工作队列,线程工厂(管理线程)
**/

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),threadFactory);

return threadPoolExecutor;
}
}

点击并拖拽以移动

这里使用的时java自带的线程池。给他注册上@bean 并配置

ThreadPoolExecutor()中四个参数分别为:核心线程数量,最大线程数量,线程存活时间,时间单位,工作队列,消息工厂。还有一个拒绝策略的参数

核心线程数量:

​ 相当于正式员工,有活了都是先给他们干。

最大线程数量:

​ 相当于临时工,只有忙不过来的时候才招募(队列满了)

线程存活时间:

​ 当用上了最大线程数量时等待请求数量减少到不需要这些临时工时就给他删去。也就是零时工多久没干活了就给它删了

工作队列:

​ 队列存储着要等待的任务,所以要设置队列大小。

消息工厂:

​ 负责控制每个线程的生成,这里自己新写了一个

拒绝策略:

​ 如果队列满了被拒绝要进行的操作

流程:

​ 当任务进入这个线程池 如果 此时核心数量还有空闲那么直接分配给核心。 如果核心数量满了则进入工作队列。如果工作也满了那么就去申请额外的线程保证总数量不超过最大线程数量。要是最大线程数量也满了那么就被拒绝。

一般情况下,任务分为 10 密集型和计算密集型两种。计算密集型:吃 CPU,比如音视频处理、图像处理、数学计算等,一般是设置 corePoolSize 为CPU 的核数+1(空余线程),可以让每个线程都能利用好 CPU 的每个核,而且线程之间不用频繁切换(减少打架、减少开销)
10 密集型:吃带宽/内存/硬盘的读写资源,corePoolSize 可以设置大一点,一般经验值是 2n 左右,但是建议以 10 的能力为主。

—by yupi

测试接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) //只在本地测试环境生效
public class QueueController {

@Resource
// 自动注入一个线程池的实例
private ThreadPoolExecutor threadPoolExecutor;

@GetMapping("/add")
// 接收一个参数name,然后将任务添加到线程池中
public void add(String name) {
// 使用CompletableFuture运行一个异步任务
CompletableFuture.runAsync(() -> {
// 打印一条日志信息,包括任务名称和执行线程的名称
log.info("任务执行中:" + name + ",执行人:" + Thread.currentThread().getName());
try {
// 让线程休眠10分钟,模拟长时间运行的任务
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 异步任务在threadPoolExecutor中执行
}, threadPoolExecutor);
}

@GetMapping("/get")
// 该方法返回线程池的状态信息
public String get() {
// 创建一个HashMap存储线程池的状态信息
Map<String, Object> map = new HashMap<>();
// 获取线程池的队列长度
int size = threadPoolExecutor.getQueue().size();
// 将队列长度放入map中
map.put("队列长度", size);
// 获取线程池已接收的任务总数
long taskCount = threadPoolExecutor.getTaskCount();
// 将任务总数放入map中
map.put("任务总数", taskCount);
// 获取线程池已完成的任务数
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
// 将已完成的任务数放入map中
map.put("已完成任务数", completedTaskCount);
// 获取线程池中正在执行任务的线程数
int activeCount = threadPoolExecutor.getActiveCount();
// 将正在工作的线程数放入map中
map.put("正在工作的线程数", activeCount);
// 将map转换为JSON字符串并返回
return JSONUtil.toJsonStr(map);
}
}

点击并拖拽以移动

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
    @Override
public DeepSeekResponseVO genChartByAiAsync(MultipartFile multipartFile, GenChartByAiRequest genChartByAiRequest, User loginUser) {

String name = genChartByAiRequest.getName();
String goal = genChartByAiRequest.getGoal();
String chartType = genChartByAiRequest.getChartType();
//用户输入
StringBuilder userInput = new StringBuilder();

String userGoal = goal;
if(StringUtils.isNotBlank(chartType)){
userGoal+="请使用"+chartType;
}
userInput.append("分析需求:").append(userGoal).append("\n");
String csvData = ExcelUtils.excelToCsv(multipartFile);
userInput.append("原始目标:").append(csvData).append("\n");


//保存Chart
Chart chart = new Chart();
chart.setGoal(goal);
chart.setName(name);
chart.setChartType(chartType);
//先不返回
// chart.setGenChart(genChart);
// chart.setGenResult(genResult);
chart.setUserId(loginUser.getId());
//设置为排队
chart.setStatus("wait");

boolean saveResult = this.save(chart);
ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");

//创建ChartData表Sql
String chartDataTableSql = ChartTableUtils.ChartDataToTableSql(csvData,chart.getId());
//防止sql注入
ThrowUtils.throwIf(SqlUtils.validSortField(chartDataTableSql),ErrorCode.PARAMS_ERROR,"创建chartData表非法");
//创建表
chartMapper.createChartDataTable(chartDataTableSql);

//插入CharData数据Sql
String chartDataValueSql = ChartTableUtils.ChartDataInsertTableSql(csvData,chart.getId());
//防止sql注入
ThrowUtils.throwIf(SqlUtils.validSortField(chartDataValueSql),ErrorCode.PARAMS_ERROR,"插入chartData数据非法");
//插入数据
chartMapper.insertChartData(chartDataValueSql);

//异步处理
//使用deepseek获取结果
CompletableFuture.runAsync(()->{
//更新Chart数据库的Status 状态。从wait->running
Chart updatingChart = new Chart();
updatingChart.setId(chart.getId());
//修改状态
updatingChart.setStatus("running");
boolean updateStatusResult = this.updateById(updatingChart);
if(!updateStatusResult){
handleChartUpdateError(chart.getId(),"更新图表执行中状态失败");
}
String result = deepSeekService.callDeepSeek(userInput.toString());
String[] split = result.split("【【【【【");
if(split.length<3){
handleChartUpdateError(chart.getId(),"AI 生成错误");
}
String genChart = split[1].trim();
String genResult = split[2].trim();
//得到结果后,再更新一次
Chart updateChartResult = new Chart();
updateChartResult.setId(chart.getId());
updateChartResult.setStatus("succeed");
updateChartResult.setGenChart(genChart);
updateChartResult.setGenResult(genResult);

boolean updateResult = this.updateById(updateChartResult);

if(!updateResult){
handleChartUpdateError(chart.getId(),"更新图表成功状态失败");
}
});
//返回VO结果
DeepSeekResponseVO deepSeekResponseVO = new DeepSeekResponseVO();
deepSeekResponseVO.setChartId(chart.getId());
// deepSeekResponseVO.setGenChart(chart.getGenChart());
// deepSeekResponseVO.setGenResult(chart.getGenResult());
return deepSeekResponseVO;
}

public void handleChartUpdateError(long chartId,String execMessage){
Chart updateChart = new Chart();
updateChart.setId(chartId);
updateChart.setExecMessage(execMessage);
updateChart.setStatus("failed");
boolean result = this.updateById(updateChart);
ThrowUtils.throwIf(!result,ErrorCode.SYSTEM_ERROR,"保存Chart执行消息失败");
if(!result){
log.error("更新图表失败 "+chartId+ " "+execMessage);
}
}

点击并拖拽以移动

TODO:

​ \1. 当线程空闲时应该把被拒绝的请求完成。

​ \2. 超时控制,增加一个超时时间,如果超过这个时间则标记为失败。

​ 3.反向压力:(通过AI服务当前任务队列数来控制核心数量)最大化利用资源反向压力 - 知乎