最后主要是做了一个利用RabbitMq 去发布和接收生成图表的任务的工作。
Q:为什么要用RabbitMq ?用线程池不是已经解决了吗
A:在单机开发中线程池 确实能解决一些问题,但是到分布式环境中,使用消息队列可以更加方便的进行开发。因为消息队列可以算是一个跨系统的东西。可以很好的解耦合。而且线程池无法解决优先顺序问题。
为什么说前端监控系统离不开 RabbitMQ?前端监控系统是采集用户端的异常、性能、业务埋点等数据上报,在服务端做存储, - 掘金
消息队列:
我认为的消息队列它拥有可持久化,顺序化,有容错等,有很多方便的工具的东西。
持久化:
它可以把数据保存起来,即使重启也没事。并不像缓存那样。
容错机制:
他有一个死信队列的东西,专门用来解决如果出错了要怎么处理的方案。
分配机制:
rabbitMq 有一个类似与计算机网络交换机,可以根据不同的routingKey(IP地址)去发送生产者的需求给目标消费者的消息队列。
消息确认机制:
当消费者接收消息后,可以返回消息(ack、nack、reject) 来告诉消息队列,然后进行相对应的处理。
实现BI项目消息队列:
初始化新的Bi队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.yupi.springbootinit.bizmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.yupi.springbootinit.constant.BiMqConstant;
public class BiInitMain {
public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String EXCHANGE_NAME = BiMqConstant.BI_EXCHANGE_NAME; channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = BiMqConstant.BI_QUEUE_NAME; channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY); } catch (Exception e) { } } }
|

生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.yupi.springbootinit.bizmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component public class BiMessageProducer { @Resource private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); }
}
|

消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| package com.yupi.springbootinit.bizmq;
import com.rabbitmq.client.Channel; import com.yupi.springbootinit.common.ErrorCode; import com.yupi.springbootinit.constant.BiMqConstant; import com.yupi.springbootinit.exception.BusinessException; import com.yupi.springbootinit.exception.ThrowUtils; import com.yupi.springbootinit.mapper.ChartMapper; import com.yupi.springbootinit.model.entity.Chart; import com.yupi.springbootinit.service.ChartService; import com.yupi.springbootinit.service.DeepSeekService; import com.yupi.springbootinit.utils.ExcelUtils; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j public class BiMessageConsumer { @Resource ChartService chartService;
@Resource DeepSeekService deepSeekService;
@SneakyThrows @RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME}, ackMode = "MANUAL") public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { log.info("receiveMessage message = {}", message);
if(StringUtils.isBlank(message)){ channel.basicNack(deliveryTag,false,false); throw new BusinessException(ErrorCode.NOT_FOUND_ERROR,"图表为空"); } long chartId = Long.parseLong(message); Chart chart = chartService.getById(chartId);
ThrowUtils.throwIf(chart==null,ErrorCode.PARAMS_ERROR,"图表为空");
boolean result = doAiChart(chart);
if(!result) { channel.basicNack(deliveryTag,false,false); throw new BusinessException(ErrorCode.SYSTEM_ERROR,"执行AiChart失败"); } log.info("执行AiChart成功"); channel.basicAck(deliveryTag, false); }
public boolean doAiChart(Chart chart){
try { String csvData = chartService.getChartDataByChartId(chart.getId()); ThrowUtils.throwIf(StringUtils.isBlank(csvData),ErrorCode.NOT_FOUND_ERROR,"图表数据找不到");
Chart updateChart = new Chart(); updateChart.setId(chart.getId()); updateChart.setStatus("running"); boolean runningUpdate = chartService.updateById(updateChart); ThrowUtils.throwIf(!runningUpdate,ErrorCode.SYSTEM_ERROR,"更新状态running失败");
String goal = chart.getGoal(); String chartType = chart.getChartType();
String userInput = chartService.getUserInput(goal,chartType,csvData); String result = deepSeekService.callDeepSeek(userInput);
String[] split = result.split("【【【【【"); if(split.length<3){ chartService.handleChartUpdateError(chart.getId(),"AI 生成错误"); return false; } String genChart = split[1].trim(); String genResult = split[2].trim(); Chart updateChartResult = new Chart(); updateChartResult.setId(chart.getId()); updateChartResult.setStatus("succeed"); updateChartResult.setGenChart(genChart); updateChartResult.setGenResult(genResult);
boolean updateResult = chartService.updateById(updateChartResult);
if(!updateResult){ chartService.handleChartUpdateError(chart.getId(),"更新图表成功状态失败"); return false; }
}catch (Exception e){ return false; }
return true; }
}
|

此时只需要将上次做的异步创建中的线程池改成生产者发送需求进入消息队列,然后让消费者去按顺序执行任务即可。
RabbitMq 有很多不同类型的交换机一个一个说太麻烦了,不如直接贴一个官方教程。以后忘了再看。
RabbitMQ tutorial - Routing | RabbitMQ