您现在的位置是:主页 > news > 做网站的程序员留备份/一年的百度指数
做网站的程序员留备份/一年的百度指数
admin2025/6/19 14:13:37【news】
简介做网站的程序员留备份,一年的百度指数,平台管理系统登录,想要学做网站需要什么在之前的文章SpringBoot 中使用Redis Stream 实现消息监听中的demo代码写的比较乱,而且也有部分问题,随着最近有些小伙伴的交流我整理了一下代码,后续还会继续优化,感兴趣的可以在码云里拉取后实验和修改——码云地址hlove/redism…
在之前的文章SpringBoot 中使用Redis Stream 实现消息监听中的demo代码写的比较乱,而且也有部分问题,随着最近有些小伙伴的交流我整理了一下代码,后续还会继续优化,感兴趣的可以在码云里拉取后实验和修改——码云地址hlove/redismq。
整理部分
整理后的代码我将redis的stream名称配置和组名通过配置文件的方式进行配置,application.yml文件如下:
spring:redis:host: 127.0.0.1password: yourpasswordport: 6379
#stream 相关配置
redis-stream:#stream 名称数组names: mystream1,mystream2#stream 群组名称groups: group1
stream名称和组名都可以设置为数组的形式,监听多个stream或组,接收消息的 ListenerMessage 类整理如下:
@Slf4j
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {RedisUtil redisUtil;public ListenerMessage(RedisUtil redisUtil){this.redisUtil = redisUtil;}@Overridepublic void onMessage(MapRecord<String, String, String> entries) {try{log.info("接受到来自redis的消息");System.out.println("message id "+entries.getId().getValue());System.out.println("stream "+entries.getStream());System.out.println("body "+entries.getValue());redisUtil.delField(entries.getStream(),entries.getId().getValue());}catch (Exception e){log.error("error message:{}",e.getMessage());}}}
onMessage方法用于接收消息,最后调用的delField方法用于删除读取后的消息,这里可以根据业务需求来决定你需不需要删除。
接下来就是需要将监听启动注入到spring中,对应的类为——RedisStreamConfig,代码如下:
@Slf4j
@Configuration
public class RedisStreamConfig {private final ListenerMessage streamListener;private final RedisUtil redisUtil;@Value("${redis-stream.names}")private String[]redisStreamNames;@Value("${redis-stream.groups}")private String[]groups;@Autowiredpublic RedisStreamConfig(RedisUtil redisUtil){this.redisUtil = redisUtil;this.streamListener = new ListenerMessage(redisUtil);}@Beanpublic List<Subscription> subscription(RedisConnectionFactory factory){List<Subscription> resultList = new ArrayList<>();var options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();for (String redisStreamName : redisStreamNames) {initStream(redisStreamName,groups[0]);var listenerContainer = StreamMessageListenerContainer.create(factory,options);Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groups[0], this.getClass().getName()),StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()), streamListener);resultList.add(subscription);listenerContainer.start();}return resultList;}private void initStream(String key, String group){boolean hasKey = redisUtil.hasKey(key);if(!hasKey){Map<String,Object> map = new HashMap<>();map.put("field","value");RecordId recordId = redisUtil.addStream(key, map);redisUtil.addGroup(key,group);//将初始化的值删除掉redisUtil.delField(key,recordId.getValue());log.info("stream:{}-group:{} initialize success",key,group);}}
}
首先将配置中的stream和group获取到,然后在注入的方法中循环启动监听,注意里面的 this.getClass().getName() ,这个只是我使用这个类的名称作为消费者,大家可以根据实际需求修改这里。
下面的方法——initStream方法用于初始化stream,这也是之前代码中存在的一个问题,有的小伙伴在没有通过命令建立stream时启动会报错,就是因为没有对应的stream,这次通过这个方法可以避免这样的问题。
代码运行
如果你拉取了我的这个demo的话启动也是很简单的,只需要修改application.yml中的相关配置后就可以直接启动,注意的是你的redis版本应该大于5,然后直接启动就可以了。通过下面的测试接口进行测试发送消息
@GetMapping("/sendTest/{streamName}")public String addStream(@PathVariable String streamName){Map<String,Object> message = new HashMap<>();message.put("test","hello redismq");message.put("send time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));return redisUtil.addStream(streamName, message).getValue();}
采用restful请求格式例如:
http://localhost:8080/test/sendTest/mystream1
观察控制台打印会发现有对应的消息打印出来说明运行成功!