您现在的位置是:主页 > news > 电子商务网站设计与.../网站策划书模板范文
电子商务网站设计与.../网站策划书模板范文
admin2025/6/6 15:23:40【news】
简介电子商务网站设计与...,网站策划书模板范文,wordpress 插件教程,网站如何做301生产者消息重试生产者在发送消息(不包含顺序发送消息)的时候,同步、异步不进行重试,oneway不进行重试消息重试原则上可以保证消息发送成功以及不丢失,但是消息重新投递可能造成消费者重复消费,RocketMQ不保证幂等性,所…
生产者消息重试
生产者在发送消息(不包含顺序发送消息)的时候,同步、异步不进行重试,oneway不进行重试
消息重试原则上可以保证消息发送成功以及不丢失,但是消息重新投递可能造成消费者重复消费,RocketMQ不保证幂等性,所以开发者如果有幂等性的要求,需要自行保证幂等
mq重试的默认值:同步需要开启重试配置:retryAnotherBrokerWhenNotStoreOK = true,默认是不开启重试
private int retryTimesWhenSendFailed = 2;private int retryTimesWhenSendAsyncFailed = 2;
也可以自行设置重试次数
//异步重试producer.setRetryTimesWhenSendAsyncFailed(3);//同步重试producer.setRetryTimesWhenSendFailed(3);
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //根据topic负载均衡算法选择一个MessageQueue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //向 MessageQueue 发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { //判断是否开启重复发送。默认是关闭 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } }
同步发送就是简单的for循环重新重试注意(需要 retryAnotherBrokerWhenNotStoreOK = true),异步重试是在callback中进行判断,如果response为空重试,或者在处理processSendResponse存在异常时重试
消费者重试
RocketMQ在消费者消费失败的时候提供重试,代码有点多,主要就是异步拉取消息(其中有一个参数pullCallBack回调函数),省略的逻辑主要是:拉取之前会进行一些阈值、消息大小以及偏移量的判断,决定是否要延迟放入队列中
public void pullMessage(final PullRequest pullRequest) { //主要判断 消息的阈值、队列的大小等决定是否要延迟加入处理的队列中 ..... //回掉函数,★ 1 PullCallback pullCallback = new PullCallback(){....} try { //异步拉取信息 this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback //★ 2 ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
其中pullKernelImpl()这个方法调用逻辑,调用的整体逻辑
PullAPIWrapper.pullKernelImpl()//会根据broker名称获取broker信息 设置消息头信息 --MQClientAPIImpl.pullMessage()//根据communicationMode获取具体的调用逻辑--异步拉取消息 --MQClientAPIImpl.pullMessageAsync()//拉取消息 回掉pullCallback -- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()// 回掉方法调用submitConsumeRequest --ConsumeMessageConcurrentlyService.ConsumeRequest.run()//最终执行为线程池中的run方法
其中run方法会调用我们消费端写的listene实现类,这就是mq的消费的整体逻辑
其中status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);执行消费段写的listener,
消费的成功以及失败之后的逻辑:ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);去判断消费是成功了还是失败了,失败了重新发送
↑↑↑↑↑↑↑↑图例1↑↑↑↑↑↑↑↑
如果消费成功,ackIndex是consumeRequest.getMsgs().size() - 1;就不执行for循环(for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)),也就不执行补偿,失败ackIndex == -1,重新发送消息,
广播模式,只打印一条日志,不进行其他的任何操作,
集群模式:跟踪方法sendMessageBack(msg, context);
首先根据 brokerName 得到 broker 地址信息,然后通过网络发送到指定的 Broker上。
如果上述过程失败,则创建一条新的消息重新发送给 Broker,此时新消息的主题为重试主题:"%RETRY%" + ConsumeGroupName, 注意,这里的主题和原先的消息主题没任何关系而是和消费组相关。
同步调用告诉服务端请求 CONSUMER_SEND_MSG_BACK,到broker中找到SendMessageProcessor的asyncConsumerSendMsgBack方法
通过网络发送到指定的 Broker上
CONSUMER_SEND_MSG_BACK,到broker中找到SendMessageProcessor的asyncConsumerSendMsgBack方法
获取订阅组的信息
SubscriptionGroupConfig
以下是主要的属性
获取topic主题
获取消息
根据偏移量在commitlog中获取message信息
获取延迟等级
写入commitlog中
主题为%RETRY%+consumerGroup,主题是基于消费者组来的,并不是基于原来的主题进行重新定义的,如果消息发送失败了,5s之后重新放入队列重新发送( ↑↑↑↑↑↑↑↑图例1↑↑↑↑↑↑↑↑图示有失败之后的逻辑)
这些失败的消息,直接更新偏移量,定义为已经消费的消息
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
整体流程
根据消费结果,设置ackIndex的值。
如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack,这里会创建新的消息(重试次数,延迟执行)。
更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)。
欢迎关注关注博主的公众号,可以领取 回复888 可以获取一份面试资料