请先学习http://my.oschina.net/dyyweb/blog/667858 direct模式
由于本人系统地学习和使用过rabbitmq,所以直接上代码
很简单,要代码 直接找我
public class Constants { public static String direct_exchange = "dy-exchange"; public static String queue_direct = "dy-queue"; public static String fanout_exchange = "dy-exchange-fanout"; public static String queue_fanout_1 = "dy-queue-fanout-1"; public static String queue_fanout_2 = "dy-queue-fanout-2"; public static String topic_exchange = "dy-exchange-topic"; public static String queue_topic_1 = "dy-queue-topic-1"; public static String queue_topic_2 = "dy-queue-topic-2"; public static String queue_topic_3 = "dy-queue-topic-3";}消费者1:
package com.dy.fanout;import com.dy.Constants;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * Created by dy on 16-4-28. */public class FanoutConsumer_1 { public String host ="127.0.0.1"; public void init() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); //获取链接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); channel.queueDeclare(Constants.queue_fanout_1, false, false, false, null); //绑定队列,交换器,路由键(该模式路由不用了) channel.queueBind(Constants.queue_fanout_1, Constants.fanout_exchange,""); /** * 订阅消息 * autoAck是否消息订阅到队列就确认 * basicConsume(String queue, boolean autoAck, TopicConsumer_1 callback) */ QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(Constants.queue_fanout_1, false,consumer); while (true) { try { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody(), "UTF-8"); System.out.println("我接收到的消息是:"+msg); // 返回接收到消息的确认信息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { System.out.println(e.toString()); } } } public static void main(String[] args) { try { new FanoutConsumer_1().init(); } catch (Exception e) { e.printStackTrace(); } }}消费者2:
package com.dy.fanout;import com.dy.Constants;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * Created by dy on 16-4-28. */public class FanoutConsumer_2 { public String host ="127.0.0.1"; public void init() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(Constants.queue_fanout_2, false, false, false, null); //绑定队列,交换器,路由键(该模式路由不用了) channel.queueBind(Constants.queue_fanout_2, Constants.fanout_exchange,""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(Constants.queue_fanout_2, false,consumer); while (true) { try { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody(), "UTF-8"); System.out.println("我接收到的消息是:"+msg); // 返回接收到消息的确认信息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { System.out.println(e.toString()); } } } public static void main(String[] args) { try { new FanoutConsumer_2().init(); } catch (Exception e) { e.printStackTrace(); } }}消息生产者:
package com.dy.fanout;import com.dy.Constants;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * Created by dy on 16-4-28. */public class FanoutProducer { public String host ="127.0.0.1"; public Connection connection; public Channel channel; public void init() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); //获取链接 connection = factory.newConnection(); //创建信道 channel = connection.createChannel(); /** * 声明交换器 * 交换器类型主要有三种 * direct:精准匹配路由键,fanout:广播匹配,topic模糊(多)匹配路由键(可以优先级) * exchangeDeclare(String exchange, String type) */ channel.exchangeDeclare(Constants.fanout_exchange, "fanout"); } public void publish(String msg) throws Exception{ //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) this.channel.basicPublish(Constants.fanout_exchange,"", null, msg.getBytes("UTF-8")); } public void close() throws Exception{ channel.close(); connection.close(); } public static void main(String[] args) { try { FanoutProducer producer = new FanoutProducer(); producer.init(); String msg ="this is a msg from fanout producer!我的序列是:"; for (int i = 1;i<8;i++){ producer.publish(msg+i); } producer.close(); } catch (Exception e) { e.printStackTrace(); } }}