最后主要是做了一个利用RabbitMq 去发布和接收生成图表的任务的工作。

Q:为什么要用RabbitMq ?用线程池不是已经解决了吗
A:在单机开发中线程池 确实能解决一些问题,但是到分布式环境中,使用消息队列可以更加方便的进行开发。因为消息队列可以算是一个跨系统的东西。可以很好的解耦合。而且线程池无法解决优先顺序问题。

为什么说前端监控系统离不开 RabbitMQ?前端监控系统是采集用户端的异常、性能、业务埋点等数据上报,在服务端做存储, - 掘金

消息队列:

​ 我认为的消息队列它拥有可持久化,顺序化,有容错等,有很多方便的工具的东西。

持久化:

​ 它可以把数据保存起来,即使重启也没事。并不像缓存那样。

容错机制:

​ 他有一个死信队列的东西,专门用来解决如果出错了要怎么处理的方案。

分配机制:

​ rabbitMq 有一个类似与计算机网络交换机,可以根据不同的routingKey(IP地址)去发送生产者的需求给目标消费者的消息队列。

消息确认机制:

​ 当消费者接收消息后,可以返回消息(ack、nack、reject) 来告诉消息队列,然后进行相对应的处理。

实现BI项目消息队列:

初始化新的Bi队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.yupi.springbootinit.bizmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.yupi.springbootinit.constant.BiMqConstant;

/**
* 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)
*/
public class BiInitMain {

public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 定义交换机的名称为"code_exchange"
String EXCHANGE_NAME = BiMqConstant.BI_EXCHANGE_NAME;
// 声明交换机,指定交换机类型为 direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

// 创建队列,随机分配一个队列名称
String queueName = BiMqConstant.BI_QUEUE_NAME;
// 声明队列,设置队列持久化、非独占、非自动删除,并传入额外的参数为 null
channel.queueDeclare(queueName, true, false, false, null);
// 将队列绑定到交换机,指定路由键为 "my_routingKey"
channel.queueBind(queueName, EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY);
} catch (Exception e) {
// 异常处理
}
}
}

点击并拖拽以移动

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.yupi.springbootinit.bizmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

// 使用@Component注解标记该类为一个组件,让Spring框架能够扫描并将其纳入管理
@Component
public class BiMessageProducer {
// 使用@Resource注解对rabbitTemplate进行依赖注入
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送消息的方法
*
* @param exchange 交换机名称,指定消息要发送到哪个交换机
* @param routingKey 路由键,指定消息要根据什么规则路由到相应的队列
* @param message 消息内容,要发送的具体消息
*/
public void sendMessage(String exchange, String routingKey, String message) {
// 使用rabbitTemplate的convertAndSend方法将消息发送到指定的交换机和路由键
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

}

点击并拖拽以移动

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.yupi.springbootinit.bizmq;

import com.rabbitmq.client.Channel;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.constant.BiMqConstant;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.exception.ThrowUtils;
import com.yupi.springbootinit.mapper.ChartMapper;
import com.yupi.springbootinit.model.entity.Chart;
import com.yupi.springbootinit.service.ChartService;
import com.yupi.springbootinit.service.DeepSeekService;
import com.yupi.springbootinit.utils.ExcelUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

// 使用@Component注解标记该类为一个组件,让Spring框架能够扫描并将其纳入管理
@Component
// 使用@Slf4j注解生成日志记录器
@Slf4j
public class BiMessageConsumer {
@Resource
ChartService chartService;

@Resource
DeepSeekService deepSeekService;

/**
* 接收消息的方法
*
* @param message 接收到的消息内容,是一个字符串类型
* @param channel 消息所在的通道,可以通过该通道与 RabbitMQ 进行交互,例如手动确认消息、拒绝消息等
* @param deliveryTag 消息的投递标签,用于唯一标识一条消息
*/
// 使用@SneakyThrows注解简化异常处理
@SneakyThrows
// 使用@RabbitListener注解指定要监听的队列名称为"code_queue",并设置消息的确认机制为手动确认
@RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME}, ackMode = "MANUAL")
// @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag是一个方法参数注解,用于从消息头中获取投递标签(deliveryTag),
// 在RabbitMQ中,每条消息都会被分配一个唯一的投递标签,用于标识该消息在通道中的投递状态和顺序。通过使用@Header(AmqpHeaders.DELIVERY_TAG)注解,可以从消息头中提取出该投递标签,并将其赋值给long deliveryTag参数。
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
// 使用日志记录器打印接收到的消息内容
log.info("receiveMessage message = {}", message);

//如果为空消息
if(StringUtils.isBlank(message)){
channel.basicNack(deliveryTag,false,false);
throw new BusinessException(ErrorCode.NOT_FOUND_ERROR,"图表为空");
}
//获取图表
long chartId = Long.parseLong(message);
Chart chart = chartService.getById(chartId);

ThrowUtils.throwIf(chart==null,ErrorCode.PARAMS_ERROR,"图表为空");

//执行AiChart
boolean result = doAiChart(chart);

//
if(!result) {
channel.basicNack(deliveryTag,false,false);
throw new BusinessException(ErrorCode.SYSTEM_ERROR,"执行AiChart失败");
}
log.info("执行AiChart成功");
// 投递标签是一个数字标识,它在消息消费者接收到消息后用于向RabbitMQ确认消息的处理状态。通过将投递标签传递给channel.basicAck(deliveryTag, false)方法,可以告知RabbitMQ该消息已经成功处理,可以进行确认和从队列中删除。
// 手动确认消息的接收,向RabbitMQ发送确认消息
channel.basicAck(deliveryTag, false);
}

public boolean doAiChart(Chart chart){

try {
//先根据chartId获取图表数据
String csvData = chartService.getChartDataByChartId(chart.getId());
//处理没有数据情况
ThrowUtils.throwIf(StringUtils.isBlank(csvData),ErrorCode.NOT_FOUND_ERROR,"图表数据找不到");

//更新chart
Chart updateChart = new Chart();
updateChart.setId(chart.getId());
//设置正在执行状态
updateChart.setStatus("running");
boolean runningUpdate = chartService.updateById(updateChart);
ThrowUtils.throwIf(!runningUpdate,ErrorCode.SYSTEM_ERROR,"更新状态running失败");

//用户输入题词器
String goal = chart.getGoal();
String chartType = chart.getChartType();

String userInput = chartService.getUserInput(goal,chartType,csvData);
//发起请求
String result = deepSeekService.callDeepSeek(userInput);

String[] split = result.split("【【【【【");
if(split.length<3){
chartService.handleChartUpdateError(chart.getId(),"AI 生成错误");
return false;
}
String genChart = split[1].trim();
String genResult = split[2].trim();
//得到结果后,再更新一次
Chart updateChartResult = new Chart();
updateChartResult.setId(chart.getId());
updateChartResult.setStatus("succeed");
updateChartResult.setGenChart(genChart);
updateChartResult.setGenResult(genResult);

boolean updateResult = chartService.updateById(updateChartResult);

if(!updateResult){
chartService.handleChartUpdateError(chart.getId(),"更新图表成功状态失败");
return false;
}

}catch (Exception e){
return false;
}

return true;
}


}

点击并拖拽以移动

此时只需要将上次做的异步创建中的线程池改成生产者发送需求进入消息队列,然后让消费者去按顺序执行任务即可。

RabbitMq 有很多不同类型的交换机一个一个说太麻烦了,不如直接贴一个官方教程。以后忘了再看。
RabbitMQ tutorial - Routing | RabbitMQ