您现在的位置是:主页 > news > 小城市网站建设业务/网络营销的推广手段
小城市网站建设业务/网络营销的推广手段
admin2025/6/19 16:16:39【news】
简介小城市网站建设业务,网络营销的推广手段,如何开公司,佛山免费建站RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送…
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC的调用等等。
1、RabbitMQ介绍
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
2、springboot集成RabbitMQ
springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。
2.1 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 配置文件
在application.yml配置文件中加入RabbitMQ配置信息
server:port: 10001spring:application:name: rabbitmq-hellorabbitmq:host: localhostport: 5672username: guestpassword: guest
2.3 编写RabbitConfig类
RabbitConfig类里面设置很多个EXCHANGE,QUEUE,ROUTINGKEY,是为了接下来的不同使用场景,交换机,队列,通过路由关键字进行绑定。
package com.rabbit.mq;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;/*** 编写RabbitConfig类,类里面设置很多个EXCHANGE,QUEUE,ROUTINGKEY,是为了接下来的不同使用场景。** Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。* Queue:消息的载体,每个消息都会被投到一个或多个队列。* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.* Routing Key:路由关键字,exchange根据这个关键字进行消息投递。* vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。* Producer:消息生产者,就是投递消息的程序.* Consumer:消息消费者,就是接受消息的程序.* Channel:消息通道,在客户端的每个连接里,可建立多个channel.*/
@Configuration
public class RabbitConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;public static final String EXCHANGE_A = "my-mq-exchange_A";public static final String EXCHANGE_B = "my-mq-exchange_B";public static final String EXCHANGE_C = "my-mq-exchange_C";public static final String QUEUE_A = "QUEUE_A";public static final String QUEUE_B = "QUEUE_B";public static final String QUEUE_C = "QUEUE_C";public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;}/*** 针对消费者配置* 1. 设置交换机类型* 2. 将队列绑定到交换机** FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念* HeadersExchange :通过添加属性key-value匹配* DirectExchange:按照routingkey分发到指定队列* TopicExchange:多关键字匹配*/@Beanpublic DirectExchange defaultExchange() {return new DirectExchange(EXCHANGE_A);}/*** 获取队列A* @return*/@Beanpublic Queue queueA() {return new Queue(QUEUE_A, true); //队列持久}@Beanpublic Binding binding() {return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}@Beanpublic Queue queueB() {return new Queue(QUEUE_B, true); //队列持久}@Beanpublic Binding bindingB(){return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);}
}
2.4 生产者
package com.rabbit.mq;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** 消息发送*/
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {private final Logger logger = LoggerFactory.getLogger(this.getClass());//由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入private RabbitTemplate rabbitTemplate;/*** 构造方法注入rabbitTemplate*/@Autowiredpublic MsgProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容}public void sendMsg(String content) {CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列ArabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);}/*** 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info(" 回调id:" + correlationData);if (ack) {logger.info("消息成功消费");} else {logger.info("消息消费失败:" + cause);}}
}
2.5 消费者
2.5.1 类直接定义接收队列
一个生产者可以多个消费者,多个消费者之间的分发是负载均衡的。
package com.rabbit.mq;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收*/@Component
@RabbitListener(queues = "QUEUE_A")
public class MsgReceiver {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("接收处理队列A当中的消息: " + content);}}
2.5.2 在RabbitMQConfig类里面增加bean
@Beanpublic SimpleMessageListenerContainer messageContainer() {//加载处理消息A的队列SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());//设置接收多个队列里面的消息,这里设置接收队列A//假如想一个消费者处理多个队列里面的信息可以如下设置://container.setQueues(queueA(),queueB(),queueC());container.setQueues(queueA());container.setExposeListenerChannel(true);//设置最大的并发的消费者数量container.setMaxConcurrentConsumers(10);//最小的并发消费者的数量container.setConcurrentConsumers(1);//设置确认模式手工确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {/**通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它 */channel.basicQos(1);byte[] body = message.getBody();logger.info("接收处理队列A当中的消息:" + new String(body));/**为了保证永远不会丢失消息,RabbitMQ支持消息应答机制。当消费者接收到消息并完成任务后会往RabbitMQ服务器发送一条确认的命令,然后RabbitMQ才会将消息删除。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}});return container;}
3. 测试
package com.rabbit.mq;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HelloController {@Autowiredprivate MsgProducer msgProducer;@RequestMapping("/send/{name}")public void helloworld(@PathVariable String name) {msgProducer.sendMsg(name);}
}