2024年3月7日
微服务扩展性和高可用、可扩展性是微服务架构的两个重要特性。微服务架构通过将应用程序拆分为多个小型、独立部署的服务,实现了系统的高可维护性和可扩展性。以下将从微服务的可扩展性、高可用性两个方面进行详细的解释。 微服务的可扩展性 微服务的可扩展性主要体现在以下几个方面: 灵活性:微服务架构……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消息发送存储流程 第一步:检查消息存储状态 org.apache.rocketmq.store.DefaultMessageStore#checkStoreStatus 1、检查 broker 是否可用 1 2 3 4 if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return PutMessageStatus.SERVICE_NOT_AVAILABLE; } 2、检查 broker 的角色 1 2 3 4 5 6 7 if (BrokerRole.SLAVE== this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("broke role is slave, so putMessage is forbidden"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } 3、检查 messageStore 是否可写 1 2 3 4 5 6 7 8 9 10 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) ==……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消息发送流程 这里以同步发送为示例讲解: 入口: org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) 消息发送 默认超时时间 3 秒 第一步:验证 主题的长度不能大于 127,消息的大小不能大于 4M 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消息拉取流程 之前在消费者启动流程中描述过 MQClientInstance 的启动流程,在启动过程中会启动 PullMessageService,它继承了ServiceThread,并且 ServiceThread 实现了 Runnable 接口,所以是单独启动了一个线程 public class PullMessageService extends ServiceThread public abstract class ServiceThread implements Runnable PullMessageService 的 run 方法如下: protected volatile boolean stopped = false; 1……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ broker 处理拉取消息请求流程 org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand) 第 1 步、校验broker是否可读 1 2 3 4 5 if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); return response; } 第 2 步、根据消费组获取订阅信息 1 2 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); 第 3 步、校验是否允许消费 1 2 3 4 5 if (!subscriptionGroupConfig.isConsumeEnable()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); return response; } 第 4 步、根据主题获取对应的配置信息 1 2 3 4……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 生产者启动流程 入口: org.apache.rocketmq.client.producer.DefaultMQProducer#start 1 2 3 4 5 6 7 8 9 10 11 12 @Override public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup)); this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } } 第一步、检查 producerGroup 1 2 3 4 5 6 7 8 9 10 11 private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); if (null == this.defaultMQProducer.getProducerGroup()) { throw new MQClientException("producerGroup is null", null); } if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null); } } 第二步、设置 instanceName 1 2……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RockerMQ Nameserver 如何与 Broker 进行通信的? nameserver 每隔 10s 扫描一次 Broker,移除处于未激活状态的 Broker 核心代码: this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.*SECONDS*); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public int scanNotActiveBroker() { int removeCount = 0; Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last +BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(),BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); removeCount++; } } return removeCount; } broker 每隔 30 秒会向集群……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ MappedFile 内存映射文件详解 1、MappedFile 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void init(final String fileName, final int fileSize) throws IOException { this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; ensureDirOK(this.file.getParent()); try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ IndexFile 详解 首先明确一下 IndexFile 的文件结构 Index header + 哈希槽,每个槽下面挂载 index 索引,类似哈希表的结构 一个 Index 文件默认包含 500 万个哈希槽,一个哈希槽最多存储 4 个 index,也就是一个 IndexFile 默认最多包含 2000 万个 index Index header: 40byte Index header = 8byte 的 beginTimestamp(In……
阅读全文
2024年3月6日
该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消费者启动流程 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start 1、检查配置信息 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig 校验消费组的长度不能大于 255 public static final int CHARACTER_MAX_LENGTH = 255; 1 2 3 if (group.length() >CHARACTER_MAX_LENGTH) { throw new MQClientException("the specified group is longer than group max length 255.", null); } 消费组名称只能包含数字、字母、%、-、_、| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 // regex: ^[%|a-zA-Z0-9_-]+$ // % VALID_CHAR_BIT_MAP['%'] = true; // - VALID_CHAR_BIT_MAP['-'] = true; // _ VALID_CHAR_BIT_MAP['_'] = true; // |……
阅读全文