雷州手机网站建设公司,公司做网站域名的好处,郑州医疗网站开发,安徽建设银行官方网站TL;DR
场景#xff1a;同一队列多消费者分摊任务 一条消息广播给多个订阅方结论#xff1a;Work Queue 依赖 manual ack basicQos 控制分发#xff1b;fanout 依赖交换器绑定与临时队列实现“一对多”产出#xff1a;给出 Java 生产/消费代码骨架、未命名交换器用法、临…TL;DR场景同一队列多消费者分摊任务 一条消息广播给多个订阅方结论Work Queue 依赖 manual ack basicQos 控制分发fanout 依赖交换器绑定与临时队列实现“一对多”产出给出 Java 生产/消费代码骨架、未命名交换器用法、临时队列与 binding 的验证路径RabbitMQ 工作模式Work Queue生产者发送消息启动多个消费者实例来消费消息每个消费者仅消费部分信息可以达到负载均衡的效果。NewTaskpackageicu.wzk.demo;publicclassTestTask{privatestaticfinalStringHOSTlocalhost;privatestaticfinalStringVIRTUAL_HOST/;privatestaticfinalStringUSERNAMEadmin;privatestaticfinalStringPASSWORDsecret;privatestaticfinalintPORT5672;privatestaticfinalStringQUEUE_NAMEwzk-icu;privatestaticfinalString[]WORKS{hello.,hello..,hello...,hello....,hello.....,hello......,hello.......,hello........,hello.........,hello..........};publicstaticvoidmain(String[]args){ConnectionFactoryfactorynewConnectionFactory();factory.setHost(HOST);factory.setVirtualHost(VIRTUAL_HOST);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);factory.setPort(PORT);try(Connectionconnectionfactory.newConnection();Channelchannelconnection.createChannel()){channel.queueDeclare(QUEUE_NAME,false,false,false,null);Stringexchange;for(Stringwork:WORKS){channel.basicPublish(exchange,QUEUE_NAME,null,work.getBytes(StandardCharsets.UTF_8));System.out.println( [x] Sent work);}}catch(Exceptione){e.printStackTrace();}}}对应的执行结果如下所示TestTask2packageicu.wzk.demo;publicclassTestTask2{privatestaticfinalStringTASK_QUEUE_NAMEwzk-icu;publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{ConnectionFactoryfactorynewConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(admin);factory.setPassword(secret);// false 手动 ack推荐配合 basicQosbooleanautoAckfalse;try(Connectionconnfactory.newConnection();Channelchannelconn.createChannel()){// 每次只拉取 1 条避免一次性堆给消费者channel.basicQos(1);channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);DeliverCallbackdeliverCallback(consumerTag,delivery)-{StringtasknewString(delivery.getBody(),StandardCharsets.UTF_8);longtagdelivery.getEnvelope().getDeliveryTag();System.out.println( [x] Received task);try{doWork(task);System.out.println( [x] Done);// 手动确认channel.basicAck(tag,false);}catch(InterruptedExceptione){Thread.currentThread().interrupt();// 中断时重回队列channel.basicNack(tag,false,true);}catch(Exceptione){// 失败时重回队列按需改为 false 丢弃channel.basicNack(tag,false,true);}};channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,consumerTag-{});// 阻塞主线程保持进程存活synchronized(TestTask2.class){try{TestTask2.class.wait();}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}}}privatestaticvoiddoWork(Stringtask)throwsInterruptedException{System.out.println(task task);for(charch:task.toCharArray()){if(ch.){Thread.sleep(1000);}}}}执行结果如下所示发布订阅在RabbitMQ消息队列系统中fanout类型交换器是一种重要的消息分发机制。这种交换器的工作方式类似于广播模式具有以下特点消息分发机制生产者将消息发送到Exchange时会完全忽略RoutingKey路由键的设置Exchange会将收到的每一条消息复制并推送到所有绑定到该Exchange的队列每个消费者队列都会收到完整的消息副本队列绑定过程每个消费者在订阅时RabbitMQ会自动为其创建一个专属队列这个新创建的队列会与指定的Exchange建立绑定关系绑定过程不涉及任何路由规则或过滤条件典型应用场景实时通知系统如股票行情推送日志收集系统多个日志处理服务同时接收日志事件广播系统订单创建事件通知多个微服务技术实现细节# 生产者示例代码channel.exchange_declare(exchangelogs,exchange_typefanout)channel.basic_publish(exchangelogs,routing_key,bodymessage)# 消费者示例代码resultchannel.queue_declare(queue,exclusiveTrue)channel.queue_bind(exchangelogs,queueresult.method.queue)与传统点对点模式的区别传统模式一个消息只能被一个消费者处理fanout模式一个消息会被所有消费者处理消息生命周期在fanout模式下消息会被复制多份分别存储在不同队列中这种模式特别适合需要一对多消息分发的场景确保所有订阅者都能及时获取完整的消息内容。由于不依赖RoutingKey系统设计更加简单但需要注意可能带来的资源消耗问题因为每条消息都会被复制多份。交换器的类型前面已经介绍过了● direct● topic● headers● fanout发布订阅使用 fanout创建交换器叫做 logschannel.exchangeDeclare(logs,fanout);fanout 交换器很简单从名字可以看到叫用风扇吹出去将收到的消息发送给它知道的所有队列。rabbitmqctl list_exchanges列出 RabbitMQ 的交换器包括了 amq.* 和 默认未命名的交换器。未命名交换器在前面我们虽然没有指定交换器但是依然可以向队列发送消息这是因为我们使用了默认的交换器。channel.basicPublish(,hello,null,message.getBytes());第一个参数就是交换器的名称为空字符串直接使用 RoutingKey 向队列发送消息如果该 RoutingKey 指定的队列存在的话。如果我们要向指定的交换发布器发送消息channel.basicPublish(logs,,null,message.getBytes());临时队列生产者和消费者都是通过队列名称来发送和接收该队列中的消息。在使用RabbitMQ时创建队列的过程需要注意以下几点队列创建机制当连接到RabbitMQ时通常需要创建一个新的空队列队列命名方式有两种选择自定义命名可以指定一个有意义的队列名称如order_processing_queue自动生成可以让RabbitMQ服务器生成随机队列名如amq.gen-JzTY20BRgKO-HjmUJj0wLg队列生命周期管理临时队列特性当声明队列时将exclusive参数设为true该队列就变成独占队列自动删除机制对于独占队列一旦消费者断开连接RabbitMQ会自动删除该队列持久化队列如果需要长期保留队列可以设置durable参数为true应用场景示例临时任务处理使用自动生成的队列名处理一次性任务RPC调用客户端创建临时队列接收服务端响应消息广播多个消费者各自创建独占队列绑定到同一个交换机注意事项生产环境中建议使用明确的队列命名规范自动删除队列适合临时性消息交换场景确保消费者异常断开时队列能正确释放资源StringqueueNamechannel.queueDeclare().getQueue();上述代码我们声明了一个非持久化的、排他的、自动删除的队列并且名字都是服务器随机生成的。进行绑定在创建了消息队列和fanout类型的交换器之后我们需要将两者进行绑定让交换器将消息发送给该队列。fanout交换器会将收到的所有消息广播到所有与之绑定的队列中这种模式非常适合需要消息广播的场景比如系统日志分发或实时通知推送。下面是具体的绑定操作示例代码channel.queueBind(queueName,logs,);这段代码中各个参数的含义是queueName要绑定的队列名称logs交换器的名称路由键routing key在fanout类型中不需要使用因此为空字符串此时logs 交换器会将接收到的所有消息无条件地追加到我们的队列中。为了验证绑定是否成功可以使用下面的命令列出 RabbitMQ 中交换器的绑定关系rabbitmqctl list_bindings这个命令会显示类似如下的输出exchange_name queue_name routing_key logs queue1 logs queue2 输出结果会清晰地展示每个交换器与队列之间的绑定关系包括交换器名称、队列名称和使用的路由键。在fanout类型的绑定中路由键通常会显示为空字符串因为fanout交换器会忽略路由键的设置。错误速查症状根因定位修复多消费者但分发不均、某个消费者“吃撑”未设置 basicQos 或 prefetch 过大看消费者侧吞吐与未确认数unacked是否偏高channel.basicQos(1..N)manual ack 后再调优 N消费者异常后消息丢失autoAcktrue或提前 ack查basicConsume的 autoAck 参数、日志中 ack 时机设autoAckfalse仅在业务完成后basicAck失败消息无限重试、队列“打转”basicNack(..., requeuetrue)对不可恢复错误也重回队列观察同一消息反复出现且无退避区分可恢复/不可恢复不可恢复requeuefalse或走 DLX/重试队列重启后队列/消息消失队列非持久化 消息非持久化看queueDeclare(durablefalse)、发布属性是否为 persistent生产durable queue persistent message配合镜像/Quorum 队列策略视需求发布订阅消费者断开后队列没了使用临时队列 exclusive/auto-delete 的预期行为queueDeclare(queue, exclusivetrue)或 server 生成队列名需要持久订阅就改为命名队列 durable临时订阅保持现状fanout 下设置 routingKey 但无效果fanout 忽略 routingKey检查exchange_typefanout需要按 key 路由就改 direct/topicfanout 保持 routingKey 为空即可发送到 exchange 后消费者收不到未 queueBind 或 bind 到错误 exchangerabbitmqctl list_bindings看是否存在绑定确认queueBind(queueName,logs,)与 exchange 名一致使用默认交换器发送失败无路由routingKey队列名不存在或拼错生产者basicPublish的 routingKey 与实际队列名比对先确保queueDeclare创建目标队列routingKey 必须等于队列名消费端进程退出导致不再消费try-with-resources 结束或主线程未阻塞看 main 是否退出、连接是否关闭保持进程存活你用 wait() 方式可行或用更标准的生命周期管理连接失败/权限错误vhost/用户名密码/权限不匹配RabbitMQ 日志与连接异常栈校验 vhost 存在、用户对 vhost 的 configure/write/read 权限其他系列 AI篇持续更新中长期更新AI炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究持续打造实用AI工具指南AI研究-132 Java 生态前沿 2025Spring、Quarkus、GraalVM、CRaC 与云原生落地 AI模块直达链接 Java篇持续更新中长期更新Java-196 消息队列选型RabbitMQ vs RocketMQ vs KafkaMyBatis 已完结Spring 已完结Nginx已完结Tomcat已完结分布式服务已完结Dubbo已完结MySQL已完结MongoDB已完结Neo4j已完结FastDFS 已完结OSS已完结GuavaCache已完结EVCache已完结RabbitMQ正在更新… 深入浅出助你打牢基础 Java模块直达链接 大数据板块已完成多项干货更新300篇包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件覆盖离线实时数仓全栈大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT案例 详解 大数据模块直达链接