RabbitMQ简介及Java API
1.RabbitMQ简介
RabbitMQ是一个多租户系统,由虚拟主机提供了资源的逻辑分组和分隔:连接、交换器、队列、绑定、用户权限、策略和其他的东西都属于虚拟主机(virtual hosts,v_host)。
虚拟机
客户端连接到RabbitMQ时,需要指定虚拟主机名称,同时还需要提供用户名和密码,只有用户具有相关的权限才能建立连接。
RabbitMQ包含一个默认的虚拟主机:“/”。默认操作的都是这个虚拟主机,其用户名和密码默认都是guest。RabbitMQ禁止guest用户远程访问,只可以访问本地的mq服务。
虚拟机相关操作如下:
创建虚拟机命令:rabbitmqctl add_vhost vhost_name
添加用户: rabbitmqctl add_user <username> <password>
用户和授权:rabbitmqctl set_permissions -p <vhost> <username> <permissions>
限制虚拟主机的最大连接数量: rabbitmqctl set_vhost_limits -p <vhost_name> '{"max-connections": 256}'
限制虚拟主机的最大队列数量:rabbitmqctl set_vhost_limits -p <vhost_name> '{"max-queues": 256}'
交换器
从3.7版本开始,RabbitMQ开始支持为topic交换器设置权限,在设置权限时,要结合发布到主题交换器的消息的routing key信息。设置了topic授权后,向topic路由器发布消息,或者从topic路由器接收消息,都会依据消息的routing key进行权限检查,如果routing key与权限设置匹配,则成功,否则抛出异常。
权限机制
RabbitMQ有一套独立的权限机制:配置、写、读,通过三个正则表达式来分别匹配所访问资源的这三项权限。
相关术语
代理(即RabbitMQ服务端):Broker、RabbitMQ Server
客户端:Client
连接:Connection
连接工厂:Connection Factory
通道:Channel
声明:Declare
交换机:Exchange
队列:Queue
发布者:Publisher
消费者:Consumer
路由:Route、routable、unroutable
持久化:durable、non-durable
自动删除:autodelete、non-autodelete
排斥:exclusive
路由关键字:routing key
2.Java API
RabbitMQ Java客户端使用“com.rabbitmq.client”作为顶级包。主要的类和接口:
● Channel
● Connection
● ConnectionFactory
● Consumer
通过“Channel”接口可以进行协议操作。
连接到代理(虚拟机):
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost(vhostName);
factory.setUsername(user);
factory.setPassword(pass);
Connection connection = factory.newConnection();
建立消息通道:
然后,“Connection”接口可用用来打开一个通道,用来发送和接收消息:
Channel channel = conn.createChannel();
通常每个线程使用一个“Channel”。
与代理断开连接:
channel.close();
conn.close();
绑定队列到交换机:
声明交换机和队列,通过给定的路由关键字(routing key)将队列绑定到交换机:
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
发布消息:
使用“Channel.basicPublish”:
byte[] messageBodyBytes = "Hello!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
接收消息:
使用“Consumer”接口发起订阅,传空字符串调用“Channel.basicConsume”方法,并使用返回值即可得到唯一的tag,消费者标签用于取消消费者的订阅。不同的“Consumer”实例必须有不同的消费者标签。
channel.basicConsume(queueName, false, "consumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey =envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag =envelope.getDeliveryTag();
// todo 处理消息
channel.basicAck(deliveryTag, false);
}
});
取消消费者的订阅:
channel.basicCancel(consumerTag);