博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq之fanout交换器以及rabbitmq java发布订阅 消息实现
阅读量:5833 次
发布时间:2019-06-18

本文共 5557 字,大约阅读时间需要 18 分钟。

  hot3.png

请先学习http://my.oschina.net/dyyweb/blog/667858   direct模式

由于本人系统地学习和使用过rabbitmq,所以直接上代码

很简单,要代码 直接找我

public class Constants {    public static String direct_exchange = "dy-exchange";    public  static String queue_direct = "dy-queue";    public static  String fanout_exchange = "dy-exchange-fanout";    public static  String queue_fanout_1 = "dy-queue-fanout-1";    public static  String queue_fanout_2 = "dy-queue-fanout-2";    public static String topic_exchange = "dy-exchange-topic";    public static String queue_topic_1 = "dy-queue-topic-1";    public static String queue_topic_2 = "dy-queue-topic-2";    public static String queue_topic_3 = "dy-queue-topic-3";}
消费者1:
package com.dy.fanout;import com.dy.Constants;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * Created by dy on 16-4-28. */public class FanoutConsumer_1 {    public String host ="127.0.0.1";    public void init() throws Exception{        ConnectionFactory factory = new ConnectionFactory();        factory.setHost(host);        //获取链接        Connection connection = factory.newConnection();        //创建信道        Channel channel = connection.createChannel();        channel.queueDeclare(Constants.queue_fanout_1, false, false, false, null);        //绑定队列,交换器,路由键(该模式路由不用了)        channel.queueBind(Constants.queue_fanout_1, Constants.fanout_exchange,"");        /**         * 订阅消息         * autoAck是否消息订阅到队列就确认         * basicConsume(String queue, boolean autoAck, TopicConsumer_1 callback)         */        QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(Constants.queue_fanout_1, false,consumer);        while (true) {            try {                QueueingConsumer.Delivery delivery = consumer.nextDelivery();                String msg = new String(delivery.getBody(), "UTF-8");                System.out.println("我接收到的消息是:"+msg);                // 返回接收到消息的确认信息                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            } catch (Exception e) {                System.out.println(e.toString());            }        }    }    public static void main(String[] args) {        try {            new FanoutConsumer_1().init();        } catch (Exception e) {            e.printStackTrace();        }    }}
消费者2:
package com.dy.fanout;import com.dy.Constants;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * Created by dy on 16-4-28. */public class FanoutConsumer_2 {    public String host ="127.0.0.1";    public void init() throws Exception{        ConnectionFactory factory = new ConnectionFactory();        factory.setHost(host);        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(Constants.queue_fanout_2, false, false, false, null);        //绑定队列,交换器,路由键(该模式路由不用了)        channel.queueBind(Constants.queue_fanout_2, Constants.fanout_exchange,"");        QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(Constants.queue_fanout_2, false,consumer);        while (true) {            try {                QueueingConsumer.Delivery delivery = consumer.nextDelivery();                String msg = new String(delivery.getBody(), "UTF-8");                System.out.println("我接收到的消息是:"+msg);                // 返回接收到消息的确认信息                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            } catch (Exception e) {                System.out.println(e.toString());            }        }    }    public static void main(String[] args) {        try {            new FanoutConsumer_2().init();        } catch (Exception e) {            e.printStackTrace();        }    }}
消息生产者:
package com.dy.fanout;import com.dy.Constants;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * Created by dy on 16-4-28. */public class FanoutProducer {    public String host ="127.0.0.1";    public Connection connection;    public Channel channel;    public void init() throws Exception{        ConnectionFactory factory = new ConnectionFactory();        factory.setHost(host);        //获取链接        connection = factory.newConnection();        //创建信道        channel = connection.createChannel();        /**         * 声明交换器         * 交换器类型主要有三种         * direct:精准匹配路由键,fanout:广播匹配,topic模糊(多)匹配路由键(可以优先级)         * exchangeDeclare(String exchange, String type)         */        channel.exchangeDeclare(Constants.fanout_exchange, "fanout");    }    public void publish(String msg) throws Exception{        //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)        this.channel.basicPublish(Constants.fanout_exchange,"", null, msg.getBytes("UTF-8"));    }    public void close() throws Exception{        channel.close();        connection.close();    }    public static void main(String[] args) {        try {            FanoutProducer producer = new FanoutProducer();            producer.init();            String msg ="this is a msg from fanout producer!我的序列是:";            for (int i = 1;i<8;i++){                producer.publish(msg+i);            }            producer.close();        } catch (Exception e) {            e.printStackTrace();        }    }}

转载于:https://my.oschina.net/dyyweb/blog/667860

你可能感兴趣的文章
[Js]布局转换
查看>>
Hot Bath
查看>>
国内常用NTP服务器地址及
查看>>
Java annotation 自定义注释@interface的用法
查看>>
Apache Spark 章节1
查看>>
phpcms与discuz的ucenter整合
查看>>
Linux crontab定时执行任务
查看>>
mysql root密码重置
查看>>
33蛇形填数
查看>>
选择排序
查看>>
SQL Server 数据库的数据和日志空间信息
查看>>
前端基础之JavaScript
查看>>
自己动手做个智能小车(6)
查看>>
自己遇到的,曾未知道的知识点
查看>>
P1382 楼房 set用法小结
查看>>
分类器性能度量
查看>>
windows 环境下切换 python2 与 pythone3 以及常用命令
查看>>
docker 基础
查看>>
解决灾难恢复后域共享目录SYSVOL与NELOGON共享丢失
查看>>
eclipse集成weblogic开发环境的搭建
查看>>