包含标签 RocketMQ 的文章

rocketmq-send-store

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消息发送存储流程 第一步:检查消息存储状态 org.apache.rocketmq.store.DefaultMessageStore#checkStoreStatus 1、检查 broker 是否可用 1 2 3 4 if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return PutMessageStatus.SERVICE_NOT_AVAILABLE; } 2、检查 broker 的角色 1 2 3 4 5 6 7 if (BrokerRole.SLAVE== this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("broke role is slave, so putMessage is forbidden"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } 3、检查 messageStore 是否可写 1 2 3 4 5 6 7 8 9 10 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) ==……

阅读全文

rocketmq-send-message

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消息发送流程 这里以同步发送为示例讲解: 入口: org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) 消息发送 默认超时时间 3 秒 第一步:验证 主题的长度不能大于 127,消息的大小不能大于 4M 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw……

阅读全文

rocketmq-pullmessage

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消息拉取流程 之前在消费者启动流程中描述过 MQClientInstance 的启动流程,在启动过程中会启动 PullMessageService,它继承了ServiceThread,并且 ServiceThread 实现了 Runnable 接口,所以是单独启动了一个线程 public class PullMessageService extends ServiceThread public abstract class ServiceThread implements Runnable PullMessageService 的 run 方法如下: protected volatile boolean stopped = false; 1……

阅读全文

rocketmq-pullmessage-processor

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ broker 处理拉取消息请求流程 org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand) 第 1 步、校验broker是否可读 1 2 3 4 5 if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); return response; } 第 2 步、根据消费组获取订阅信息 1 2 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); 第 3 步、校验是否允许消费 1 2 3 4 5 if (!subscriptionGroupConfig.isConsumeEnable()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); return response; } 第 4 步、根据主题获取对应的配置信息 1 2 3 4……

阅读全文

rocketmq-producer-start

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 生产者启动流程 入口: org.apache.rocketmq.client.producer.DefaultMQProducer#start 1 2 3 4 5 6 7 8 9 10 11 12 @Override public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup)); this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } } 第一步、检查 producerGroup 1 2 3 4 5 6 7 8 9 10 11 private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); if (null == this.defaultMQProducer.getProducerGroup()) { throw new MQClientException("producerGroup is null", null); } if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null); } } 第二步、设置 instanceName 1 2……

阅读全文

rocketmq-nameserver-broker

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RockerMQ Nameserver 如何与 Broker 进行通信的? nameserver 每隔 10s 扫描一次 Broker,移除处于未激活状态的 Broker 核心代码: this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.*SECONDS*); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public int scanNotActiveBroker() { int removeCount = 0; Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last +BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(),BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); removeCount++; } } return removeCount; } broker 每隔 30 秒会向集群……

阅读全文

rocketmq-mappedfile-detail

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ MappedFile 内存映射文件详解 1、MappedFile 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void init(final String fileName, final int fileSize) throws IOException { this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; ensureDirOK(this.file.getParent()); try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file……

阅读全文

rocketmq-indexfile

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ IndexFile 详解 首先明确一下 IndexFile 的文件结构 Index header + 哈希槽,每个槽下面挂载 index 索引,类似哈希表的结构 一个 Index 文件默认包含 500 万个哈希槽,一个哈希槽最多存储 4 个 index,也就是一个 IndexFile 默认最多包含 2000 万个 index Index header: 40byte Index header = 8byte 的 beginTimestamp(In……

阅读全文

rocketmq-consumer-start

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ 消费者启动流程 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start 1、检查配置信息 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig 校验消费组的长度不能大于 255 public static final int CHARACTER_MAX_LENGTH = 255; 1 2 3 if (group.length() >CHARACTER_MAX_LENGTH) { throw new MQClientException("the specified group is longer than group max length 255.", null); } 消费组名称只能包含数字、字母、%、-、_、| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 // regex: ^[%|a-zA-Z0-9_-]+$ // % VALID_CHAR_BIT_MAP['%'] = true; // - VALID_CHAR_BIT_MAP['-'] = true; // _ VALID_CHAR_BIT_MAP['_'] = true; // |……

阅读全文

rocketmq-consumequeue

该文所涉及的 RocketMQ 源码版本为 4.9.3。 RocketMQ ConsumeQueue 详解 RocketMQ 基于主题订阅模式实现消息消费,消费者关注每一个主题下的所有消息,但是同一主题下的消息是不连续地存储在 CommitLog 文件中的,如果消费者直接从消息存储文件中遍历查找主题下的消息,效率会特别低。所以为了在查找消息的时候效率更高一些,设计了 ConsumeQueue 文件,可以……

阅读全文

最近文章

福利派送

分类

标签

360搜索 58同城 Abtest AB测试 AdaBoost AdaDeltaW AdamW AdvancedFeaturesOfNetty AFM模型 AI AIGC AILab Airbnb AI平台 AKF架构 Alink Android ANN AnnotationFormatterFactory Announcement AOF AOP Apache ApacheFlink Apollo AQS ArchitectureDesign AresDB ASR AUC Augur Automaton AutoML Bagging Bandits Basic BERT BI BigGAN BI平台 Boosting BPR Checkpoint Clazz ClickHouse Cluster CNN Collection ConcurrentCoding ConcurrentProgramming Condition CRF Css Cto CTR CTR模型 CTR预估 CV CVPR DDD DDD实战 DeepFFM DeepFM DeepFM模型 DesignPattern Devops Dgraph DIN DKN模型 DMP平台 Docs Doris DPP DRN Druid DSP DSSM Dubbo EE问题 Elasticsearch ELK ELMo Embedding EncodingSpecification Epoll ESSM ETL Faraday Feed流 FFM FFM模型 FixedBitSet FixMatch Flink Flutter FM FM模型 Format FreeWheel FST FTRL FullGC Game GAN GBDT GBDT+LR融合 GBM GC Git Github Gitlab GNN Go Golang Google Graphql GraphScope GRU4REC Ha3 Hadoop Hbase Hologres Hystrix Iceberg IM ImageNet Impala InnoDB IntBlockPool IoC IOTechnologyBase IRGAN IT博士 IT移民 JanusGraph Java JavaScript Jdbc JDK JS Kafka KBQA Kubernetes KV存储 Lab LambdaMART LDA LearningExperience LinUCB方法 Linux LruCache LSTM LSTM网络 Lucence Lucene Message Milvus MKR模型 MLflow MMoE Mock MoE Monorepo MRR Mvc Mybatis MySQL Nacos NDCG NER Netty Netty主要组件源码分析 Netty多协议开发 Netty技术细节源码分析 Netty编解码 Nexp Nextjs Nifi NIO NIPS NLP Node2vec Nodejs Npm OCR OKR Olap One-Hot OPPO Oss PageRank Parser PersonalExperience Pinot PlaceholderResolver Printer PropertySource Pulsar Push系统 Python Pytorch Q&A Que2Search Query扩展 Query理解 Query纠错 R-Tree React Reactjs Reactor Redis Region Registry Remote RippleNet RMI ROC RocketMQ Rpc RTree Scala SD Select Sentinel Serverless SHAP Sharding SIGAI SimCLR Skleam Softmax Solr Spark SPI Spring Spring5新特性 SpringBoot SpringBootBatch SpringCloud SpringMVC SpringSecurity SpringTransaction Spring整体脉络 Spring源码故事(瞎编版) Sqllit Stable Diffusion Stage Storm Storybook SVM Tailwind TCP粘拆包 TensorFlow TFServing Tomcat Topk Trace Transformer TurboSearch TX Typora UX Vivo Vuejs Web Web3 WebRTC Wide&Deep Word2vec XDeepFM XDL XGBoost XLNet Yoo视频 YoshuaBengio ZeroSearch Zookeeper 一致性 丑小鸭定理 业务 业务线 个性化推荐 个性化海报 中关村 中文分词 中文纠错 主题建模 书籍 事件驱动架构 事务 亚马逊 交叉验证 人工智能 人机问答 供应链 依存句法分析 信息检索 信息流推荐 倒排索引 倒排表 假设检验 全文索引 全民K歌 全链路压测 公平锁 关系 兴趣 内容挖掘 内容理解 决策树 冷启动 出轨 分层实验 分布式 分布式事务 分布式系统 分布式锁 分词 创业 加权融合 北京 区块 区块链 协同记忆网络 协同过滤 协方差 单元测试 博士 博士offer 压测 原则 双塔模型 反作弊 反欺诈 反脆弱 变量 召回 召回率 可观测性 可解释性 合约 后厂村 向量召回 向量检索 向量索引 吴恩达 命名实体识别 响应式编程 商汤科技 回归模型 因果分析 图像检索 图像识别 图数据库 图灵平台 图片翻译 图计算 在线学习 坐标回归 基于Netty开发服务端及客户端 基础支持层 增量学习 多多 多模态 多目标优化 大数据 大数据开发 头条 奥卡姆剃刀 学习 学习资料 学习资源 学会提问 实体识别 实时推荐 实时数仓 实时数据 实时日志收集 实时计算 实验平台 容灾体系建设 对比学习 对话系统 小城市 小米 小米搜索 小群效应 工作 布隆过滤器 帮帮 年轻人 广告 广告系统 序列标注 建模调参 开源数据集 开源项目 异常检测 异步IO 张嘉佳 强化学习 归一化 待分类 微服务 微软EXP 微软亚洲研究院 思维 性能优化 情感分析 意图识别 房租 技术 拆分 招聘 拼多多 持续交付 持续集成 损失函数 排序 排序模型 排队 推理系统 推荐 推荐理由 推荐系统 推送平台 提问的智慧 搜索 搜索广告 搜索引擎 搜索排序 搜索架构 搜索系统 支持向量 支持向量机 数仓体系 数仓平台 数据中台 数据仓库 数据分析 数据分析平台 数据同步 数据平台 数据指标 数据挖掘 数据治理 数据湖 数据科学 数据结构 数据集 数据驱动 文字识别 文摘 文本分类 文本情感分类 文本挖掘 文本纠错 文本表征 新闻推荐 方法论 日志架构 日志检索 时序特征挖掘 智慧物流 智能合约 智能客服 智能物流 智能语音 智能问答 智能预警 服务 机器学习 机器学习爱好者 机器学习面试题 机器翻译 机器视觉 机器阅读 条件随机场 架构 架构师 标签 标签平滑 标签识别 标签选择 校招 样本 核心处理层 检索引擎 槽位识别 模型剪枝 模型压缩 模型融合 模型评估 模型部署 模型预估 模式匹配 模式识别 正则化 注意力机制 洋码头 流批一体 消息队列 深度 深度兴趣网络 深度学习 深度树匹配 深度树检索 混排 混沌工程 火焰图 热点挖掘 熔断降级 爬虫 爱奇艺 牛顿-莱布尼茨 物流 特征工程 特征平台 特征系统 猜你喜欢 用户建模 用户画像 电商搜索 白兔 百度 相关系数 真话 矢量语义 知乎架构 知识图谱 知识增强 知识蒸馏 短文本解析 短视频 短语抽取 短语挖掘 碧桂园 神经网络 神马搜索 离线计算 秒杀架构 秒杀系统 程序人生 程序员 稳定性规范 空间索引 窗口函数 端上智能 端智能 算法 类协同训练 类目识别 粗排 索引 线程 线程池 缓存 网络图 置信度 美团 美团大脑 美团点评 职场 联邦学习 腾讯技术 腾讯音乐 自动化测试 自动驾驶 自然语言处理 色情识别 花椒直播 苞谷 蚂蚁金服 规则平台 规则引擎 视频推荐 计算广告 计算机视觉 认知 讨论区 记忆建模 记忆网络 论文 评价指标 评测指标 词向量 词嵌入 词权重 语义分割 语义匹配 语义检索 语义模型 语义理解 语言模型 语音内容识别 语音识别 谷歌面试 贝叶斯个性化排序 贝壳找房 贝壳智搜 货币化 起步 趋势科技 路径规划 软实力 边缘计算 达摩院 迁移学习 过拟合 逻辑回归 逻辑思维 采购 重叠实验框架 重排序 重构 金融 银汤匙 链表 链表求交集 阿里 阿里云 阿里妈妈 阿里小蜜 阿里达摩院 陈薇 陌陌 降纬打击 随机变量 零拷贝 面经 面试 项目管理 预估引擎 预训练 领域设计 领域驱动 风控 风控系统 高可用 高并发 高斯热图 高维数据索引 黄峥 黑盒模型

友情链接

其它