Table of Contents
Transactions
RabbitMQ is implemented based on the AMQP protocol, which realizes transaction mechanisms. Therefore, RabbitMQ also supports transaction mechanisms. SpringAMQP also provides operations related to transactions. RabbitMQ transactions allow developers to ensure that message sending and receiving are atomic, either all successful or all failed.
What is Atomicity (Key Point in Interview)?
For example: When A transfers 1000 yuan to B, it involves two steps:
1.A transfers 1000 yuan to B, which will deduct 1000 yuan from A’s account.
2.B receives 1000 yuan, and B’s account will be increased by 1000 yuan.
However, in extreme cases, if A transfers 1000 yuan to B, and the deduction from A is completed (A-1000), but due to system failure, B does not receive the money. This would result in the disappearance of 1000 yuan without a trace, which is unacceptable as no one would dare to make transfers under such circumstances.
At this point, both operations 1 and 2 are bound together; they must either be completed simultaneously or not executed at all.
If operation 1 fails, it will be “rolled back” to its original state, as if nothing ever happened.
Next, implement RabbitMQ transactions:
Declare queue:
1 2 3 4 5 6 |
//Transaction public static final String TRANS_QUEUE = “trans_queue”; @Bean(“transQueue”) public Queue transQueue() { return QueueBuilder.durable(Constants.TRANS_QUEUE).build(); } |
Configure transaction manager:
1 2 3 4 5 6 7 8 9 10 11 |
@Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } @Bean(“transRabbitTemple”) public RabbitTemplate transRabbitTemple(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // enable transactions rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } |
Producer code writing:
1 2 3 4 5 6 7 8 9 10 11 |
@Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } @Bean(“transRabbitTemple”) public RabbitTemplate transRabbitTemple(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // enable transactions rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } |
1)Without @Transactional and with exception, what happens during message sending?
At this point, only the first message was sent, and then an exception occurred, preventing the second message from being sent successfully.
2) Sending with <@Transactional> and exceptions: What happens?
1 2 3 4 5 6 7 8 9 |
@Transactional @RequestMapping(“/trans”) public String trans() { System.out.println(“trans test…”); transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, “trans”, “trans test 1…”); int num = 5/0; transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, “trans”, “trans test 2…”); return “Message sent successfully”; } |
At this point, an exception occurred. Although one message was sent, the presence of the exception caused a rollback, effectively undoing the operation.
This demonstrates the reliability of our transaction handling.
3) Sending with <@Transactional> and without exceptions: What happens?
1 2 3 4 5 6 7 8 9 |
@Transactional @RequestMapping(“/trans”) public String trans() { System.out.println(“trans test…”); transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, “trans”, “trans test 1…”); // int num = 5/0; transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, “trans”, “trans test 2…”); return “Message sent successfully”; } |
All results are normal
Message Distribution
When RabbitMQ queues have multiple consumers, the queue will distribute received messages to different consumers. Each message is only sent to one consumer in the subscription list. This approach is very suitable for scaling; if the load increases, simply create more consumers to handle messages.
By default, RabbitMQ distributes messages using a round-robin method, regardless of whether consumers have already consumed and confirmed the message. This approach is not very reasonable. Imagine if some consumers have slower consumption speeds while others are faster, which could lead to message accumulation for slow consumers and idle time for fast ones, thereby reducing the overall throughput of the application.
After completing all 10 tasks, A finds that B is still working on the first task. This will significantly reduce efficiency and lead to a overall decline in performance.
To address this issue, we can utilize the channel.basicQos(intprefetchCount) method discussed earlier. This limits the maximum number of unacknowledged messages that consumers can hold on a channel.
For example: When the consumer calls channelbasicQos(1),
A receives one message and processes it, while B also receives one message but works at a slower pace. Since A finishes processing the first message, it proceeds to handle the second message. This way, efficiency is maintained as A can work on multiple tasks without being held back by B’s slower processing.
Use Cases
1. Flow Control
In the following scenario:
An order system can handle up to 5,000 requests per second under normal circumstances.
However, during a flash sale, requests surge to 10,000 per second, which could overwhelm the order system if all are sent via MQ.RabbitMQ provides flow control mechanisms. It limits the number of requests a consumer pulls at once by setting prefetchCount. Additionally, message acknowledgment must be set to manual.
prefetchCount: Controls how many messages a consumer can prefetch from the queue. This helps achieve flow control and load balancing.
1) Configure the prefetch parameter and set the response method to manual acknowledgment
2) Configure switch and queue
123456789101112131415161718192021 package com.bite.extensions.config;import com.bite.extensions.constant.Constants;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class QosConfig {@Bean(“qosQueue”)public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean(“qosExchange”)public DirectExchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean(“qosBinding”)public Binding qosBinding(@Qualifier(“qosQueue”) Queue queue, @Qualifier(“qosExchange”) DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with(“qos”);}}
3) Producer
1 2 3 4 5 6 7 8 |
@RequestMapping(“/qos”) public String qos() { System.out.println(“qos test…”); for (int i = 0; i < 15; i++) { rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, “qos”, “qos test i…”+i); } return “Message sent successfully”; } |
4)Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
package com.bite.extensions.listener; import com.bite.extensions.constant.Constants; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class QosListener { @RabbitListener(queues = Constants.QOS_QUEUE) public void handleMessage(Message message, Channel channel) throws Exception { //Consumer logic long deliverTag = message.getMessageProperties().getDeliveryTag(); try { System.out.printf(“[qos.queue]Received message: %s, deliveryTag: %d\n”,new String(message.getBody(),“UTF-8”),deliverTag); /* //Business logic processing System.out.println(“Processing business logic!”); <p>At this point, only 5 messages will be received and the system will be blocked, reaching a rate limiting state.</p> </blockquote> <p><strong>Test 2</strong></p> <p><strong>Comment out prefetch: 5 and observe the results</strong></p> <blockquote> <p><strong>All messages in the queue will be sent and consumed at once.</strong></p> </blockquote> <h4 id=“2.%E8%B4%9F%E8%BD%BE%E5%9D%87%E8%A1%A1%C2%A0” style=“background-color:transparent;”>2. Load Balancing </h4> <blockquote> <p><strong>As shown below, when there are two consumers, one consumer processes tasks very quickly while the other is much slower. This results in one consumer being consistently busy while the other remains idle. This happens because RabbitMQ dispatches messages to the queue when they arrive; it does not consider the number of unacknowledged messages each consumer has.</strong></p> </blockquote> <p><img alt=“” height=“648” src=“https://img.bilibili1s.com/43ace1844059bf76e9c9d93bf5f65022.png” width=“710”/></p> <blockquote> <p><strong>We can use the setting <span style=“color:#fe2c24;”>prefetch=1</span> to tell RabbitMQ to only send one message at a time to a consumer. This means that before processing and acknowledging the previous message, do not send new messages to this consumer. Instead, it will be dispatched to the next available less busy consumer.</strong><img alt=“” height=“165” src=“https://img.bilibili1s.com/b329d88b69ba9174184ab0eb0fa314fa.png” width=“699”/></p> </blockquote> <p><strong>Consumers:</strong> </p> <pre>package com.bite.extensions.listener; import com.bite.extensions.constant.Constants; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class QosListener { @RabbitListener(queues = Constants.QOS_QUEUE) public void handleMessage(Message message, Channel channel) throws Exception { // Consumer logic long deliverTag = message.getMessageProperties().getDeliveryTag(); try { System.out.printf(“First consumer received message: %s, deliveryTag: %d “, new String(message.getBody(), “UTF-8”), deliverTag); Thread.sleep(3000); channel.basicAck(deliverTag, false); } catch (Exception e) { // Negative acknowledgment channel.basicNack(deliverTag, false, true); // requeue is false, becomes dead letter queue } } @RabbitListener(queues = Constants.QOS_QUEUE) public void handleMessage2(Message message, Channel channel) throws Exception { // Consumer logic long deliverTag = message.getMessageProperties().getDeliveryTag(); try { System.out.printf(“Second consumer received message: %s, deliveryTag: %d “, new String(message.getBody(), “UTF-8”), deliverTag); Thread.sleep(1000); channel.basicAck(deliverTag, false); } catch (Exception e) { // Negative acknowledgment channel.basicNack(deliverTag, false, true); // requeue is false, becomes dead letter queue } } } |
Result:
Here, it can be observed that each consumer completes a task at different speeds to prevent one consumer from waiting too long if another has not finished their part.
Conclusion: Blogging is not only about sharing learning experiences but also helps in reinforcing knowledge and summarizing key points. Due to the author’s limited proficiency, any issues or errors in the article are welcome for critique, as it aids in improvement. Additionally, the author greatly appreciates your encouragement through likes, collections, and follows; these actions are the primary motivation behind the creation of this content.
Leave a Reply
You must be logged in to post a comment.