源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

Java编程rabbitMQ实现消息的收发

  • 时间:2021-05-11 20:10 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:Java编程rabbitMQ实现消息的收发
java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 [b]RabbitMQ[/b]是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 本文不介绍amqp和rabbitmq相关知识,请自行网上查阅 本文是基于spring-rabbit中间件来实现消息的发送接受功能 see [url=http://www.rabbitmq.com/tutorials/tutorial-one-java.html]http://www.rabbitmq.com/tutorials/tutorial-one-java.html[/url] see [url=http://www.springsource.org/spring-amqp]http://www.springsource.org/spring-amqp[/url] Java编程通过操作rabbitMQ消息的收发实现代码如下:
<!-- for rabbitmq -->
  <dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>2.8.2</version>
 </dependency>
 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-amqp</artifactId>
  <version>1.1.1.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.1.1.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>com.caucho</groupId>
  <artifactId>hessian</artifactId>
  <version>4.0.7</version>
 </dependency>
 </dependencies>
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
public class EventMessage implements Serializable{
 private String queueName;
 private String exchangeName;
 private byte[] eventData;
 public EventMessage(String queueName, String exchangeName, byte[] eventData) {
  this.queueName = queueName;
  this.exchangeName = exchangeName;
  this.eventData = eventData;
 }
 public EventMessage() {
 } 
 public String getQueueName() {
  return queueName;
 }
 public String getExchangeName() {
  return exchangeName;
 }
 public byte[] getEventData() {
  return eventData;
 }
 @Override
 public String toString() {
  return "EopEventMessage [queueName=" + queueName + ", exchangeName="
    + exchangeName + ", eventData=" + Arrays.toString(eventData)
    + "]";
 }
}
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
public interface CodecFactory {
 byte[] serialize(Object obj) throws IOException;
 Object deSerialize(byte[] in) throws IOException;
}
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
public class HessionCodecFactory implements CodecFactory {
 private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
 @Override
 public byte[] serialize(Object obj) throws IOException {
  ByteArrayOutputStream baos = null;
  HessianOutput output = null;
  try {
   baos = new ByteArrayOutputStream(1024);
   output = new HessianOutput(baos);
   output.startCall();
   output.writeObject(obj);
   output.completeCall();
  } catch (final IOException ex) {
   throw ex;
  } finally {
   if (output != null) {
    try {
     baos.close();
    } catch (final IOException ex) {
     this.logger.error("Failed to close stream.", ex);
    }
   }
  }
  return baos != null ? baos.toByteArray() : null;
 }
 @Override
 public Object deSerialize(byte[] in) throws IOException {
  Object obj = null;
  ByteArrayInputStream bais = null;
  HessianInput input = null;
  try {
   bais = new ByteArrayInputStream(in);
   input = new HessianInput(bais);
   input.startReply();
   obj = input.readObject();
   input.completeReply();
  } catch (final IOException ex) {
   throw ex;
  } catch (final Throwable e) {
   this.logger.error("Failed to decode object.", e);
  } finally {
   if (input != null) {
    try {
     bais.close();
    } catch (final IOException ex) {
     this.logger.error("Failed to close stream.", ex);
  }
  }
 }
 return obj;
 }
}
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
public interface EventTemplate {
 void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
 void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}
SendRefuseException是自定义的发送失败异常类 下面是它的实现类,主要的任务就是将数据转换为EventMessage
public class DefaultEventTemplate implements EventTemplate {
 private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
 private AmqpTemplate eventAmqpTemplate;
 private CodecFactory defaultCodecFactory;
// private DefaultEventController eec;
// public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
//   CodecFactory defaultCodecFactory, DefaultEventController eec) {
//  this.eventAmqpTemplate = eopAmqpTemplate;
//  this.defaultCodecFactory = defaultCodecFactory;
//  this.eec = eec;
// }
 public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
  this.eventAmqpTemplate = eopAmqpTemplate;
  this.defaultCodecFactory = defaultCodecFactory;
 }
 @Override
 public void send(String queueName, String exchangeName, Object eventContent)
   throws SendRefuseException {
  this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
 } 
 @Override
 public void send(String queueName, String exchangeName, Object eventContent,
   CodecFactory codecFactory) throws SendRefuseException {
  if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
   throw new SendRefuseException("queueName exchangeName can not be empty.");
  }
//  if (!eec.beBinded(exchangeName, queueName))
//   eec.declareBinding(exchangeName, queueName);
  byte[] eventContentBytes = null;
  if (codecFactory == null) {
   if (eventContent == null) {
    logger.warn("Find eventContent is null,are you sure...");
   } else {
    throw new SendRefuseException(
      "codecFactory must not be null ,unless eventContent is null");
   }
  } else {
   try {
    eventContentBytes = codecFactory.serialize(eventContent);
   } catch (IOException e) {
    throw new SendRefuseException(e);
   }
  }
  // 构造成Message
  EventMessage msg = new EventMessage(queueName, exchangeName,
    eventContentBytes);
  try {
   eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
  } catch (AmqpException e) {
   logger.error("send event fail. Event Message : [" + eventContent + "]", e);
   throw new SendRefuseException("send event fail", e);
  }
 }
}
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明 然后我们再实现接受消息 首先我们需要一个消费接口,所有的消费程序都实现这个类
public interface EventProcesser {
 public void process(Object e);
}
 为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/**
 * MessageListenerAdapter的Pojo
 * <p>消息处理适配器,主要功能:</p>
 * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
 * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
 * 
 */
public class MessageAdapterHandler {
 private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
 private ConcurrentMap<String, EventProcessorWrap> epwMap;
 public MessageAdapterHandler() {
  this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
 }
 public void handleMessage(EventMessage eem) {
  logger.debug("Receive an EventMessage: [" + eem + "]");
  // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
  if (eem == null) {
   logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
   return;
  }
  if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
   logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
   return;
  }
  // 解码,并交给对应的EventHandle执行
  EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
  if (eepw == null) {
   logger.warn("Receive an EopEventMessage, but no processor can do it.");
   return;
  }
  try {
   eepw.process(eem.getEventData());
  } catch (IOException e) {
   logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
   return;
  }
 }
 protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
  if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
   throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
  }
  EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
  EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
  if (oldProcessorWrap != null) {
   logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
  }
 }
 protected Set<String> getAllBinding() {
  Set<String> keySet = epwMap.keySet();
  return keySet;
 }
 protected static class EventProcessorWrap {
  private CodecFactory codecFactory;
  private EventProcesser eep;
  protected EventProcessorWrap(CodecFactory codecFactory,
    EventProcesser eep) {
   this.codecFactory = codecFactory;
   this.eep = eep;
  }
  public void process(byte[] eventData) throws IOException{
   Object obj = codecFactory.deSerialize(eventData);
   eep.process(obj);
  }
 }
}
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
public class MessageErrorHandler implements ErrorHandler{
 private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
 @Override
 public void handleError(Throwable t) {
  logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
 }
}
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
public class EventControlConfig {
 private final static int DEFAULT_PORT = 5672;
 private final static String DEFAULT_USERNAME = "guest";
 private final static String DEFAULT_PASSWORD = "guest";
 private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
 private static final int PREFETCH_SIZE = 1;
 private String serverHost ;
 private int port = DEFAULT_PORT;
 private String username = DEFAULT_USERNAME;
 private String password = DEFAULT_PASSWORD;
 private String virtualHost;
 /**
  * 和rabbitmq建立连接的超时时间
  */
 private int connectionTimeout = 0;
 /**
  * 事件消息处理线程数,默认是 CPU核数 * 2
  */
 private int eventMsgProcessNum;
 /**
  * 每次消费消息的预取值
  */
 private int prefetchSize;
 
 public EventControlConfig(String serverHost) {
  this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
 }
 public EventControlConfig(String serverHost, int port, String username,
   String password, String virtualHost, int connectionTimeout,
   int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
  this.serverHost = serverHost;
  this.port = port>0?port:DEFAULT_PORT;
  this.username = username;
  this.password = password;
  this.virtualHost = virtualHost;
  this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
  this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
  this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
 }
 public String getServerHost() {
  return serverHost;
 }
 public int getPort() {
  return port;
 }
 public String getUsername() {
  return username;
 }
 public String getPassword() {
  return password;
 }
 public String getVirtualHost() {
  return virtualHost;
 }
 public int getConnectionTimeout() {
  return connectionTimeout;
 }
 public int getEventMsgProcessNum() {
  return eventMsgProcessNum;
 }
 public int getPrefetchSize() {
  return prefetchSize;
 }
}
[b]具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信[/b]
public interface EventController { 
 /**
  * 控制器启动方法
  */
 void start();
 /**
  * 获取发送模版
  */
 EventTemplate getEopEventTemplate();
 /**
  * 绑定消费程序到对应的exchange和queue
  */
 EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
 /*in map, the key is queue name, but value is exchange name*/
 EventController add(Map<String,String> bindings, EventProcesser eventProcesser);
}
它的实现类如下:
/**
 * 和rabbitmq通信的控制器,主要负责:
 * <p>1、和rabbitmq建立连接</p>
 * <p>2、声明exChange和queue以及它们的绑定关系</p>
 * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
 * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
 * @author yangyong
 *
 */
public class DefaultEventController implements EventController {
 private CachingConnectionFactory rabbitConnectionFactory;
 private EventControlConfig config;
 private RabbitAdmin rabbitAdmin;
 private CodecFactory defaultCodecFactory = new HessionCodecFactory();
 private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container
 private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();
 private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定
 //queue cache, key is exchangeName
 private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();
 //queue cache, key is queueName
 private Map<String, Queue> queues = new HashMap<String, Queue>();
 //bind relation of queue to exchange cache, value is exchangeName | queueName
 private Set<String> binded = new HashSet<String>();
 private EventTemplate eventTemplate; // 给App使用的Event发送客户端
 private AtomicBoolean isStarted = new AtomicBoolean(false);
 private static DefaultEventController defaultEventController;
 public synchronized static DefaultEventController getInstance(EventControlConfig config){
  if(defaultEventController==null){
   defaultEventController = new DefaultEventController(config);
  }
  return defaultEventController;
 }
 private DefaultEventController(EventControlConfig config){
  if (config == null) {
   throw new IllegalArgumentException("Config can not be null.");
  }
  this.config = config;
  initRabbitConnectionFactory();
  // 初始化AmqpAdmin
  rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
  // 初始化RabbitTemplate
  RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
  rabbitTemplate.setMessageConverter(serializerMessageConverter);
  eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);
 }
 /**
  * 初始化rabbitmq连接
  */
 private void initRabbitConnectionFactory() {
  rabbitConnectionFactory = new CachingConnectionFactory();
  rabbitConnectionFactory.setHost(config.getServerHost());
  rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
  rabbitConnectionFactory.setPort(config.getPort());
  rabbitConnectionFactory.setUsername(config.getUsername());
  rabbitConnectionFactory.setPassword(config.getPassword());
  if (!StringUtils.isEmpty(config.getVirtualHost())) {
   rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
  }
 }
 /**
  * 注销程序
  */
 public synchronized void destroy() throws Exception {
  if (!isStarted.get()) {
   return;
  }
  msgListenerContainer.stop();
  eventTemplate = null;
  rabbitAdmin = null;
  rabbitConnectionFactory.destroy();
 }
 @Override
 public void start() {
  if (isStarted.get()) {
   return;
  }
  Set<String> mapping = msgAdapterHandler.getAllBinding();
  for (String relation : mapping) {
   String[] relaArr = relation.split("\\|");
   declareBinding(relaArr[1], relaArr[0]);
  }
  initMsgListenerAdapter();
  isStarted.set(true);
 }
 /**
  * 初始化消息监听器容器
  */
 private void initMsgListenerAdapter(){
  MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);
  msgListenerContainer = new SimpleMessageListenerContainer();
  msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
  msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
  msgListenerContainer.setMessageListener(listener);
  msgListenerContainer.setErrorHandler(new MessageErrorHandler());
  msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值
  msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
  msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数
  msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));
  msgListenerContainer.start();
 }
 @Override
 public EventTemplate getEopEventTemplate() {
  return eventTemplate;
 }
 @Override
 public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {
  return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
 }
 
 public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {
  msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
  if(isStarted.get()){
   initMsgListenerAdapter();
  }
  return this;
 }
 @Override
 public EventController add(Map<String, String> bindings,
   EventProcesser eventProcesser) {
  return add(bindings, eventProcesser,defaultCodecFactory);
 }

 public EventController add(Map<String, String> bindings,
   EventProcesser eventProcesser, CodecFactory codecFactory) {
  for(Map.Entry<String, String> item: bindings.entrySet()) 
   msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
  return this;
 }
 /**
  * exchange和queue是否已经绑定
  */
 protected boolean beBinded(String exchangeName, String queueName) {
  return binded.contains(exchangeName+"|"+queueName);
 }
 /**
  * 声明exchange和queue已经它们的绑定关系
  */
 protected synchronized void declareBinding(String exchangeName, String queueName) {
  String bindRelation = exchangeName+"|"+queueName;
  if (binded.contains(bindRelation)) return;
  
  boolean needBinding = false;
  DirectExchange directExchange = exchanges.get(exchangeName);
  if(directExchange == null) {
   directExchange = new DirectExchange(exchangeName, true, false, null);
   exchanges.put(exchangeName, directExchange);
   rabbitAdmin.declareExchange(directExchange);//声明exchange
   needBinding = true;
  }
  Queue queue = queues.get(queueName);
  if(queue == null) {
   queue = new Queue(queueName, true, false, false);
   queues.put(queueName, queue);
   rabbitAdmin.declareQueue(queue); //声明queue
   needBinding = true;
  }
  if(needBinding) {
   Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
   rabbitAdmin.declareBinding(binding);//声明绑定关系
   binded.add(bindRelation);
  }
 }
}
搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO
@SuppressWarnings("serial")
public class People implements Serializable{
 private int id;
 private String name;
 private boolean male;
 private People spouse;
 private List<People> friends;
 public int getId() {
  return id;
 }
 public void setId(int id) {
  this.id = id;
 }
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 public boolean isMale() {
  return male;
 }
 public void setMale(boolean male) {
  this.male = male;
 }
 public People getSpouse() {
  return spouse;
 }
 public void setSpouse(People spouse) {
  this.spouse = spouse;
 }
 public List<People> getFriends() {
  return friends;
 }
 public void setFriends(List<People> friends) {
  this.friends = friends;
 }
 @Override
 public String toString() {
  // TODO Auto-generated method stub
  return "People[id="+id+",name="+name+",male="+male+"]";
 }
}
建立单元测试
public class RabbitMqTest{
 private String defaultHost = "127.0.0.1";
 private String defaultExchange = "EXCHANGE_DIRECT_TEST";
 private String defaultQueue = "QUEUE_TEST";
 private DefaultEventController controller;
 private EventTemplate eventTemplate;
 @Before
 public void init() throws IOException{
  EventControlConfig config = new EventControlConfig(defaultHost);
  controller = DefaultEventController.getInstance(config);
  eventTemplate = controller.getEopEventTemplate();
  controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
  controller.start();
 }
 @Test
 public void sendString() throws SendRefuseException{
  eventTemplate.send(defaultQueue, defaultExchange, "hello world");
 } 
 @Test
 public void sendObject() throws SendRefuseException{
  eventTemplate.send(defaultQueue, defaultExchange, mockObj());
 } 
 @Test
 public void sendTemp() throws SendRefuseException, InterruptedException{
  String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
  String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
  eventTemplate.send(tempQueue, tempExchange, mockObj());
  //发送成功后此时不会接受到消息,还需要绑定对应的消费程序
  controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
 } 
 @After
 public void end() throws InterruptedException{
  Thread.sleep(2000);
 } 
 private People mockObj(){
  People jack = new People();
  jack.setId(1);
  jack.setName("JACK");
  jack.setMale(true);
  
  List<People> friends = new ArrayList<>();
  friends.add(jack);
  People hanMeiMei = new People();
  hanMeiMei.setId(1);
  hanMeiMei.setName("韩梅梅");
  hanMeiMei.setMale(false);
  hanMeiMei.setFriends(friends);
  
  People liLei = new People();
  liLei.setId(2);
  liLei.setName("李雷");
  liLei.setMale(true);
  liLei.setFriends(friends);
  liLei.setSpouse(hanMeiMei);
  hanMeiMei.setSpouse(liLei);
  return hanMeiMei;
 }
 class ApiProcessEventProcessor implements EventProcesser{
  @Override
  public void process(Object e) {//消费程序这里只是打印信息
   Assert.assertNotNull(e);
   System.out.println(e);
   if(e instanceof People){
    People people = (People)e;
    System.out.println(people.getSpouse());
    System.out.println(people.getFriends());
   }
  }
 }
}
源码地址请点击[url=https://github.com/sdyy321/rabbitmq-client]这里[/url] [b]总结[/b] 以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。 感谢大家对本站的支持。
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部