您现在的位置是:主页 > news > 广州市网站建设科技公司/南宁seo优化公司

广州市网站建设科技公司/南宁seo优化公司

admin2025/6/16 9:30:39news

简介广州市网站建设科技公司,南宁seo优化公司,三门峡网站建设推广,做网站需要哪些步骤springcloud(八)springcloud-stream之消息驱动 文章预览springcloud(八)springcloud-stream之消息驱动前言一、消息生产者的构建1.1、目录结构1.2、pom文件1.3、yml文件1.4、业务类1.5.1、service层1.5.2、controller层1.5、启动类…

广州市网站建设科技公司,南宁seo优化公司,三门峡网站建设推广,做网站需要哪些步骤springcloud(八)springcloud-stream之消息驱动 文章预览springcloud(八)springcloud-stream之消息驱动前言一、消息生产者的构建1.1、目录结构1.2、pom文件1.3、yml文件1.4、业务类1.5.1、service层1.5.2、controller层1.5、启动类…

springcloud(八)springcloud-stream之消息驱动

文章预览

  • springcloud(八)springcloud-stream之消息驱动
    • 前言
    • 一、消息生产者的构建
      • 1.1、目录结构
      • 1.2、pom文件
      • 1.3、yml文件
      • 1.4、业务类
        • 1.5.1、service层
        • 1.5.2、controller层
      • 1.5、启动类
    • 二、消息消费者
      • 2.1、目录结构
      • 2.2、pom文件
      • 2.3、yml文件
      • 2.4、业务类
        • 2.4.1、controller层
      • 2.5、主启动类
    • 三、效果展示
      • 3.1、消息生产者发送消息
      • 3.2、两个消费者后台接收

前言

Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架,是一个基于Spring Boot 创建的独立生产级的,使用Spring Integration提供连接到消息代理的Spring应用。介绍持久发布 - 订阅(persistent publish-subscribe)的语义,消费组(consumer groups)和分区(partitions)的概念。

你可以添加@EnableBinding注解在你的应用上,从而立即连接到消息代理,在方法上添加@StreamListener以使其接收流处理事件,下面的例子展示了一个Sink应用接收外部信息

一、消息生产者的构建

1.1、目录结构

在这里插入图片描述

1.2、pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>com.zzuli.springcloud</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cloud-stream-rabbitmq-provider8801</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--基础配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>

1.3、yml文件

server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

1.4、业务类

1.5.1、service层

package com.zzuli.springcloud.service;public interface IMessageProvider
{public String send();
}
package com.zzuli.springcloud.service.impl;import com.zzuli.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import javax.annotation.Resource;
import org.springframework.cloud.stream.messaging.Source;import javax.annotation.Resource;
import java.util.UUID;@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{@Resourceprivate MessageChannel output; // 消息发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;}
}

1.5.2、controller层

package com.zzuli.springcloud.controller;import com.zzuli.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class SendMessageController
{@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage(){return messageProvider.send();}}

1.5、启动类

package com.zzuli.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8801
{public static void main(String[] args){SpringApplication.run(StreamMQMain8801.class,args);}
}

二、消息消费者

这里构建两个消费者 里那个一个除了端口不一样,其它完全一样,不再赘述

2.1、目录结构

在这里插入图片描述

2.2、pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>com.zzuli.springcloud</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cloud-stream-rabbitmq-consumer8802</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--基础配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>

2.3、yml文件

server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置   爆红不影响group: zzuli1  #分组避免重复消费   并且防止丢失消费eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

2.4、业务类

2.4.1、controller层

package com.zzuli.springcloudd.controller;import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t  port: "+serverPort);}
}

2.5、主启动类

package com.zzuli.springcloudd;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8802
{public static void main(String[] args){SpringApplication.run(StreamMQMain8802.class,args);}
}

三、效果展示

3.1、消息生产者发送消息

在这里插入图片描述
在这里插入图片描述

3.2、两个消费者后台接收

因为yml文件将两个消费者定义为一个组,所以不重复消费在这里插入图片描述
在这里插入图片描述