diff --git a/paicoding-service/src/main/java/com/github/paicoding/forum/service/notify/service/impl/RabbitmqServiceImpl.java b/paicoding-service/src/main/java/com/github/paicoding/forum/service/notify/service/impl/RabbitmqServiceImpl.java index c37181d1..74b80b97 100644 --- a/paicoding-service/src/main/java/com/github/paicoding/forum/service/notify/service/impl/RabbitmqServiceImpl.java +++ b/paicoding-service/src/main/java/com/github/paicoding/forum/service/notify/service/impl/RabbitmqServiceImpl.java @@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PreDestroy; import java.io.IOException; import java.util.concurrent.TimeoutException; @@ -27,6 +28,10 @@ @Service public class RabbitmqServiceImpl implements RabbitmqService { + // 设置一个消费者的固定连接,从池中获取一个连接即可 + private RabbitmqConnection rabbitmqConsumerConnection; + private Channel rabbitmqConsumerChannel; + @Autowired private NotifyService notifyService; @@ -59,41 +64,42 @@ public void publishMsg(String exchange, } + /** + * 阻塞式消费 + * @param exchange + * @param queueName + * @param routingKey + */ @Override public void consumerMsg(String exchange, String queueName, String routingKey) { - try { //创建连接 - RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection(); - Connection connection = rabbitmqConnection.getConnection(); + rabbitmqConsumerConnection = RabbitmqConnectionPool.getConnection(); + Connection connection = rabbitmqConsumerConnection.getConnection(); //创建消息信道 - final Channel channel = connection.createChannel(); + rabbitmqConsumerChannel = connection.createChannel(); //消息队列 - channel.queueDeclare(queueName, true, false, false, null); + rabbitmqConsumerChannel.queueDeclare(queueName, true, false, false, null); //绑定队列到交换机 - channel.queueBind(queueName, exchange, routingKey); + rabbitmqConsumerChannel.queueBind(queueName, exchange, routingKey); - Consumer consumer = new DefaultConsumer(channel) { + Consumer consumer = new DefaultConsumer(rabbitmqConsumerChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info("Consumer msg: {}", message); - // 获取Rabbitmq消息,并保存到DB // 说明:这里仅作为示例,如果有多种类型的消息,可以根据消息判定,简单的用 if...else 处理,复杂的用工厂 + 策略模式 notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE); - - channel.basicAck(envelope.getDeliveryTag(), false); + rabbitmqConsumerChannel.basicAck(envelope.getDeliveryTag(), false); } }; - // 取消自动ack - channel.basicConsume(queueName, false, consumer); - channel.close(); - RabbitmqConnectionPool.returnConnection(rabbitmqConnection); - } catch (InterruptedException | IOException | TimeoutException e) { + // 取消自动ack, 自动监听消息 + rabbitmqConsumerChannel.basicConsume(queueName, false, consumer); + } catch (InterruptedException | IOException e) { e.printStackTrace(); } } @@ -101,24 +107,23 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp @Override public void processConsumerMsg() { log.info("Begin to processConsumerMsg."); + consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE, CommonConstants.QUERE_KEY_PRAISE); + } - Integer stepTotal = 1; - Integer step = 0; - - // TODO: 这种方式非常 Low,后续会改造成阻塞 I/O 模式 - while (true) { - step++; - try { - log.info("processConsumerMsg cycle."); - consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE, - CommonConstants.QUERE_KEY_PRAISE); - if (step.equals(stepTotal)) { - Thread.sleep(10000); - step = 0; - } - } catch (Exception e) { - + /** + * 关闭连接和通道,销毁时关闭并归还 + */ + @PreDestroy + public void destroy() { + try { + if (rabbitmqConsumerChannel != null && rabbitmqConsumerChannel.isOpen()) { + rabbitmqConsumerChannel.close(); + } + if (rabbitmqConsumerConnection != null) { + RabbitmqConnectionPool.returnConnection(rabbitmqConsumerConnection); } + } catch (IOException | TimeoutException e) { + log.error("关闭 RabbitMQ 连接和通道时出错", e); } } }