您现在的位置是:主页 > news > 电子商务网站设计与.../网站策划书模板范文

电子商务网站设计与.../网站策划书模板范文

admin2025/6/6 15:23:40news

简介电子商务网站设计与...,网站策划书模板范文,wordpress 插件教程,网站如何做301生产者消息重试生产者在发送消息(不包含顺序发送消息)的时候,同步、异步不进行重试,oneway不进行重试消息重试原则上可以保证消息发送成功以及不丢失,但是消息重新投递可能造成消费者重复消费,RocketMQ不保证幂等性,所…

电子商务网站设计与...,网站策划书模板范文,wordpress 插件教程,网站如何做301生产者消息重试生产者在发送消息(不包含顺序发送消息)的时候,同步、异步不进行重试,oneway不进行重试消息重试原则上可以保证消息发送成功以及不丢失,但是消息重新投递可能造成消费者重复消费,RocketMQ不保证幂等性,所…

生产者消息重试

生产者在发送消息(不包含顺序发送消息)的时候,同步、异步不进行重试,oneway不进行重试

消息重试原则上可以保证消息发送成功以及不丢失,但是消息重新投递可能造成消费者重复消费,RocketMQ不保证幂等性,所以开发者如果有幂等性的要求,需要自行保证幂等

mq重试的默认值:同步需要开启重试配置:retryAnotherBrokerWhenNotStoreOK = true,默认是不开启重试

private int retryTimesWhenSendFailed = 2;private int retryTimesWhenSendAsyncFailed = 2;

也可以自行设置重试次数

//异步重试producer.setRetryTimesWhenSendAsyncFailed(3);//同步重试producer.setRetryTimesWhenSendFailed(3);
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {  String lastBrokerName = null == mq ? null : mq.getBrokerName();  //根据topic负载均衡算法选择一个MessageQueue  MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  if (mqSelected != null) {    mq = mqSelected;    brokersSent[times] = mq.getBrokerName();    try {      beginTimestampPrev = System.currentTimeMillis();      if (times > 0) {        //Reset topic with namespace during resend.        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));      }      long costTime = beginTimestampPrev - beginTimestampFirst;      if (timeout < costTime) {        callTimeout = true;        break;      }       //向 MessageQueue 发送消息      sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);      endTimestamp = System.currentTimeMillis();      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);      switch (communicationMode) {        case ASYNC:          return null;        case ONEWAY:          return null;        case SYNC:          if (sendResult.getSendStatus() != SendStatus.SEND_OK) {            //判断是否开启重复发送。默认是关闭            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {              continue;            }          }           return sendResult;        default:          break;      }    } }

同步发送就是简单的for循环重新重试注意(需要 retryAnotherBrokerWhenNotStoreOK = true),异步重试是在callback中进行判断,如果response为空重试,或者在处理processSendResponse存在异常时重试

245083f6ceb9843268057ad011329ac6.png

 消费者重试

RocketMQ在消费者消费失败的时候提供重试,代码有点多,主要就是异步拉取消息(其中有一个参数pullCallBack回调函数),省略的逻辑主要是:拉取之前会进行一些阈值、消息大小以及偏移量的判断,决定是否要延迟放入队列中

public void pullMessage(final PullRequest pullRequest) {       //主要判断 消息的阈值、队列的大小等决定是否要延迟加入处理的队列中    .....    //回掉函数,★ 1    PullCallback pullCallback = new PullCallback(){....}        try {            //异步拉取信息            this.pullAPIWrapper.pullKernelImpl(                pullRequest.getMessageQueue(),                subExpression,                subscriptionData.getExpressionType(),                subscriptionData.getSubVersion(),                pullRequest.getNextOffset(),                this.defaultMQPushConsumer.getPullBatchSize(),                sysFlag,                commitOffsetValue,                BROKER_SUSPEND_MAX_TIME_MILLIS,                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,                CommunicationMode.ASYNC,                pullCallback  //★ 2             );        } catch (Exception e) {            log.error("pullKernelImpl exception", e);            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);        }    }

其中pullKernelImpl()这个方法调用逻辑,调用的整体逻辑

PullAPIWrapper.pullKernelImpl()//会根据broker名称获取broker信息 设置消息头信息  --MQClientAPIImpl.pullMessage()//根据communicationMode获取具体的调用逻辑--异步拉取消息    --MQClientAPIImpl.pullMessageAsync()//拉取消息 回掉pullCallback      -- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()// 回掉方法调用submitConsumeRequest        --ConsumeMessageConcurrentlyService.ConsumeRequest.run()//最终执行为线程池中的run方法

其中run方法会调用我们消费端写的listene实现类,这就是mq的消费的整体逻辑

63b1a898f0d28a33c80c3e6a80085355.png

其中status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);执行消费段写的listener,

460916293df03dad56e9c87830784c71.png

消费的成功以及失败之后的逻辑:ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);去判断消费是成功了还是失败了,失败了重新发送

d61cd89f25d5281df056649375786555.png

                                                                      ↑↑↑↑↑↑↑↑图例1↑↑↑↑↑↑↑↑

如果消费成功,ackIndex是consumeRequest.getMsgs().size() - 1;就不执行for循环(for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)),也就不执行补偿,失败ackIndex == -1,重新发送消息,

广播模式,只打印一条日志,不进行其他的任何操作,

集群模式:跟踪方法sendMessageBack(msg, context);

  • 首先根据 brokerName 得到 broker 地址信息,然后通过网络发送到指定的 Broker上。

  • 如果上述过程失败,则创建一条新的消息重新发送给 Broker,此时新消息的主题为重试主题:"%RETRY%" + ConsumeGroupName, 注意,这里的主题和原先的消息主题没任何关系而是和消费组相关。

6c5c139640c1b3d822ba52935e3afd39.png

39b5c8e9c4faf9150db6fda1823867cb.png

同步调用告诉服务端请求 CONSUMER_SEND_MSG_BACK,到broker中找到SendMessageProcessor的asyncConsumerSendMsgBack方法 

通过网络发送到指定的 Broker上 

CONSUMER_SEND_MSG_BACK,到broker中找到SendMessageProcessor的asyncConsumerSendMsgBack方法 

fcf21c5d4e9174eabe25b589175cbc75.png

获取订阅组的信息

SubscriptionGroupConfig

7858055e8ab07c96e71f228eb605c797.png

以下是主要的属性

116606abfe32f73276c2439b222042ed.png

获取topic主题

d62e1d5982966ba51f233e38c56e1d85.png

获取消息

根据偏移量在commitlog中获取message信息

e2bc9b060fbfffe88c4a58237d4cfbd4.png

获取延迟等级

f31c78320c86c17628dba3b339651825.png

写入commitlog中

主题为%RETRY%+consumerGroup,主题是基于消费者组来的,并不是基于原来的主题进行重新定义的,如果消息发送失败了,5s之后重新放入队列重新发送(  ↑↑↑↑↑↑↑↑图例1↑↑↑↑↑↑↑↑图示有失败之后的逻辑)

e6865cb3405c6952f0d39c171325c40a.png

这些失败的消息,直接更新偏移量,定义为已经消费的消息

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {     this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}

整体流程

  • 根据消费结果,设置ackIndex的值。

  • 如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack,这里会创建新的消息(重试次数,延迟执行)。

  • 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)。

欢迎关注关注博主的公众号,可以领取 回复888 可以获取一份面试资料

078be14f96f91377ebfee33b3aa31adf.png81bbfdd915cd342fd853cbebd608ea4b.png