<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhenqi</groupId>
<artifactId>rabbitmq-study</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-study</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
</project>
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
]
rabbitmqctl set_user_tags openstack administrator
package com.zhenqi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Created by wuming on 2017/7/16.
*/
public abstract class EndPoint {
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endpointName) throws Exception {
this.endPointName = endpointName;
//创建一个连接工厂 connection factory
ConnectionFactory factory = new ConnectionFactory();
//设置rabbitmq-server服务IP地址
factory.setHost("192.168.146.128");
factory.setUsername("openstack");
factory.setPassword("rabbitmq");
factory.setPort(5672);
factory.setVirtualHost("/");
//得到 连接
connection = factory.newConnection();
//创建 channel实例
channel = connection.createChannel();
channel.queueDeclare(endpointName, false, false, false, null);
}
/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
* @throws IOException
*/
public void close() throws Exception{
this.channel.close();
this.connection.close();
}
}
package com.zhenqi;
import org.apache.commons.lang.SerializationUtils;
import java.io.Serializable;
/**
* Created by wuming on 2017/7/16.
*/
public class Producer extends EndPoint {
public Producer(String endpointName) throws Exception {
super(endpointName);
}
public void sendMessage(Serializable object) throws Exception {
channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
}
}
package com.zhenqi;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wuming on 2017/7/16.
*/
public class QueueConsumer extends EndPoint implements Runnable, Consumer {
private Logger LOG=Logger.getLogger(QueueConsumer.class);
public QueueConsumer(String endpointName) throws Exception {
super(endpointName);
}
public void handleConsumeOk(String s) {
}
public void handleCancelOk(String s) {
}
public void handleCancel(String s) throws IOException {
}
public void handleShutdownSignal(String s, ShutdownSignalException e) {
}
public void handleRecoverOk(String s) {
LOG.info("Consumer "+s +" registered");
}
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
Map map = (HashMap) SerializationUtils.deserialize(bytes);
LOG.info("Message Number "+ map.get("message number") + " received.");
}
public void run() {
try{
channel.basicConsume(endPointName, true,this);
}catch(IOException e){
e.printStackTrace();
}
}
}
package com.zhenqi;
import java.util.HashMap;
/**
* Created by wuming on 2017/7/16.
*/
public class TestRabbitmq {
public static void main(String[] args){
try{
QueueConsumer consumer = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Producer producer = new Producer("queue");
for (int i = 0; i < 100000; i++){
HashMap message = new HashMap();
message.put("message number", i);
producer.sendMessage(message);
System.out.println("Message Number "+ i +" sent.");
}
}catch(Exception e){
e.printStackTrace();
}
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有