本文共 3293 字,大约阅读时间需要 10 分钟。
理论
定义 消息队列:在消息的传输过程中保存消息的的容器。这是一个较为经典的消费-生产者模型,说起来比较抽象,打个比方:A线程需要给B线程发送消息(A、B线程不一定是在同一台机器上的),A线程先把消息发送到消息队列服务器上,然后B线程去读取或是订阅消息服务器上消息队列中的消息,线程A和B之间并没有进行直接通信。MQ服务器在中间起到中继的作用。
适用的应用场景
比较适合异步传输,这里解释一下什么是异步和同步。异步:发送方不关心消息有没有发送成功,只发送消息,不去获取消息是否发送成功。
同步:发送方关心消息是否发送成功,发送消息后,会等待接收方返回状态码,根据状态码来判断是否发送成功,然后执行相对于的动作。
下边以Http中的同步和异步为例:
如:普通的B/S架构客户端和服务器端之间的通信就是同步的,即提交请求 ---> 等待服务器处理完毕返回消息 ---> 拿到服务器返回的消息,处理完毕。
如:Ajax技术就是异步的,请求通过事件触发 ---> 服务器处理(浏览器不用等待,仍可以做其他的事情) ---> 处理完毕。
有人可能会好奇说应用场景怎么说到了同步和异步,那说明你还不是很理解技术和应用场景之间的紧密联系。
工作过程
生产者客户端:
客户端连接到RabbitMQ服务器上,打开一个消息通道(channel);客户端声明一个消息交换机(exchange),并设置相关属性。客户端声明一个消息队列(queue),并设置相关属性。客户端使用routing key在消息交换机(exchange)和消息队列(queue)中建立好绑定关系。客户端投递消息都消息交换机(exchange)上客户端关闭消息通道(channel)以及和服务器的连接。服务器端:exchange接收到消息后,根据消息的key(这个key的产生规则暂时没研究,有知道的小伙伴可以留言告诉我)和以及设置的binding,进行消息路由,将消息投递到一个或多个消息队列中。安装
由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang,执行命令:sudo apt-get install erlang
这样就安装完了。
接下来,安装rabbitMq:sudo apt-get install rabbitmq-server
安装完之后启动rabbitMQ
创建用户
sudo rabbitmqctl add_user lsl 123456
将用户设置为管理员(只有管理员才能远程登录)
sudo rabbitmqctl set_user_tags 用户名 administrator
同时为用户设置读写等权限
sudo rabbitmqctl set_permissions -p / lsl ".*" ".*" ".*"
跟着访问
就能看到配置页面了,这也说明已经成功安装rabbitmq实战消费者和生产者(PHP版本)
生产者:生产消息,发送消息。类似工厂。 消费者:接受消息,使用消息。类似顾客。 队列:存储消息。类似仓库、中转站。队列可以存储很多的消息,因为它基本上是一个无限制的缓冲区,前提是你的机器有足够的存储空间。多个生产者可以将消息发送到同一个队列中,多个消费者也可以只从同一个队列接收数据。这就是队列的特性。下面写一个demo来实现rabbitmq的消费者和生产者
send.php'127.0.0.1', 'port' => '5672', 'login' => 'lsl', 'password' => '123456', 'vhost'=>'/');$e_name = 'e_linvo'; //交换机名 //$q_name = 'q_linvo'; //无需队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args);if (!$conn->connect()) { die("Cannot connect to the broker!\n");}$channel = new AMQPChannel($conn);//创建交换机对象 $ex = new AMQPExchange($channel);$ex->setName($e_name);date_default_timezone_set("Asia/Shanghai");//发送消息 //$channel->startTransaction(); //开始事务 for($i=0; $i<5; ++$i){ sleep(2);//每个两秒发送一条消息 //消息内容 $message = "HelloWorld!".date("h:i:sa"); echo "Send Message:".$ex->publish($message, $k_route)."\n";}//$channel->commitTransaction(); //提交事务 $conn->disconnect();?>
rec.php
'127.0.0.1', 'port' => '5672', 'login' => 'lsl', 'password' => '123456', 'vhost'=>'/');$e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args);if (!$conn->connect()) { die("Cannot connect to the broker!\n");}$channel = new AMQPChannel($conn);//创建交换机 $ex = new AMQPExchange($channel);$ex->setName($e_name);$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n";//创建队列 $q = new AMQPQueue($channel);$q->setName($q_name);$q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declare()."\n";//绑定交换机与队列,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";//阻塞模式接收消息 echo "Message:\n";while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 }$conn->disconnect();
先让消费者接收消息
再调用生产者发送消息
再查看消费者收到的消息
假设消费者挂掉了,看看消息是怎么样的
看一下之前部署的,你会发现linvo这个队列里有5条消息,这意味着没有消费者去读取它,把消息堆积在队列里了当你打开消费者,页面的Total值就马上变为0了,这意味着消息已经被接收。
这样就模拟了队列对消息的处理。转载于:https://blog.51cto.com/13475644/2316244