文章目录
- 镜像
- docker-compose.yml
- 访问控制台
- Spring Boot
- 批量声明队列
镜像
- https://hub.docker.com/_/rabbitmq
docker pull rabbitmq:management
docker pull rabbitmq:4.0.7-management
docker-compose.yml
services:rabbitmq:image: rabbitmq:3.9.5-managementcontainer_name: rabbitmqrestart: alwaysnetwork_mode: "host"volumes:- /etc/localtime:/etc/localtime- ./rabbitmq/data:/var/lib/rabbitmqenvironment:RABBITMQ_DEFAULT_USER: adminRABBITMQ_DEFAULT_PASS: Test123RABBITMQ_DEFAULT_VHOST: /test
docker-compose up -d rabbitmq
访问控制台
- http://IP地址:15672/
账号/密码:admin/Test123
Spring Boot
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:rabbitmq:host: 192.168.0.140port: 5672username: adminpassword: Test123virtual-host: /testlistener:simple:acknowledge-mode: autoretry:enabled: truemax-attempts: 5initial-interval: 5000
批量声明队列
public class MQConstants {public static final String TEST_QUEUE_1 = "test-queue-1";public static final String TEST_QUEUE_2 = "test-queue-2";}
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}@Beanpublic List<Queue> createQueues(RabbitAdmin rabbitAdmin) {List<Queue> queueList = new ArrayList<>();List<String> queueNameList = getConstantsValueList(MQConstants.class);for (String queueName : queueNameList) {queueList.add(new Queue(queueName, true));}queueList.forEach(rabbitAdmin::declareQueue);return queueList;}private List<String> getConstantsValueList(Class<?> clazz) {Field[] fields = clazz.getDeclaredFields();List<String> valueList = new ArrayList<>(fields.length);for (Field field : fields) {int modifiers = field.getModifiers();if (Modifier.isStatic(modifiers) && Modifier.isFinal(modifiers)) {try {valueList.add(field.get(null).toString());} catch (IllegalAccessException e) {e.printStackTrace();}}}return valueList;}}
public Test {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {rabbitTemplate.convertAndSend(MQConstants.TEST_QUEUE_1, "test");}
}
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;@Component
public class TestConsumer implements ChannelAwareMessageListener {@RabbitListener(queues = MQConstants.TEST_QUEUE_1, concurrency = "10")@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String body = new String(message.getBody());System.out.println(" [x] Received: " + body);try {if ("error".equals(body)) {throw new Exception("处理失败");}System.out.println(" [✔] Done");} catch (Exception e) {System.out.println(" [✖] Error: " + e.getMessage());throw e;}}}