简介
RabbitMQ是一个开源的,使用最广的消息转发器。提供匿名消息类型,可以使用多种消息协议,消息队列等,支持集群部署。
它的轻量级能够让你很轻易地在私有云或公有云环境中部署,同时提供丰富的插件工具,能够让你轻易的进行扩展,且可使用HTTP-API、命令行工具和UI界面以便于管理和监控。
专业术语
生产者:产生并发送消息的程序
队列:类似于一个存在RabbitMQ中的邮筒,虽然消息是在应用程序和RabbitMQ中进行传递,但队列才是唯一能够存储消息的的地方。队列的大小取决于寄主机器的内存和磁盘容量,他本质上是一个巨大的消息缓存池。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列中读取消息
消费者:等待接收消息的程序
Hello World
最开始接触消息队列,我们先来一个简单传输 Hello World的消息队列。其中包含两个小程序,一个是用于发送消息send.go, 一个用来接收消息的reveive.go
在使用Go语言操作RabbitMQ之前我们需要先从github上拉取amqp包:
go get github.com/streadway/amqp
Sending
首先我们要把需要用到的包导入我们的程序
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
创建请求连接
在这里我们使用的消息队列将会基于TCP协议进行数据传输
//guest为消息队列网站注册的(可以修改),端口号默认为5672
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672")
if err!=nil{
log.Println("conn err:",err)
}
defer conn.Close()
上面代码会建立一个socket连接,处理一些协议转换及版本对接和登录授权的问题。建立连接之后,我们需要创建一个通道channel,之后我们的大多数API操作都是围绕通道来实现的
创建通道
ch, err := conn.Channel()
if err!=nil{
log.Println("create Channel err:",err)
}
defer ch.Close()
创建队列
通道创建完毕后,我们要在通道的基础上创建一个队列
q, err := ch.QueueDeclare(
"hello", //通道名称
false, //持久的
false, //使用后是否删除
false, // 唯一性
false, //不允许等待
nil, //命令参数
)
生产
当我们创建好一个消息队列后就可以向消息队列中推送消息了
body:="hello worldd"
err = ch.Publish( //发布
"", //exchange 交换器名,这里为空
q.Name, //routing key 路由 key,这里写消息队列的名字
false, //mandatory 托管的
false, //immediate 直接的,立即的
amqp.Publishing{ //发布中
ContentType: "text/plain", //内用类型
Body: []byte(body), //消息队列传输时传的是byte字节型的切片
},
)
Receiving
首先,我们还是要导入同样的包
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
创建请求连接
接着与生产者一样创建连接,注意这里的参数必须与send中的queue name相一致,这样才能实现发送/接受的配对。
//guest为消息队列网站注册的(可以修改),端口号默认为5672
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672")
if err!=nil{
log.Println("conn err:",err)
}
defer conn.Close()
创建通道
ch, err := conn.Channel()
if err!=nil{
log.Println("create Channel err:",err)
}
defer ch.Close()
创建队列
通道创建完毕后,创建的队列里的属性必须要与生产者队列属性一致,不然会不匹配
q, err := ch.QueueDeclare(
"hello", //通道名称
false, //持久的
false, //使用后是否删除
false, // 唯一性
false, //不允许等待
nil, //命令参数
)
消费
紧接着我们就可以从队列里消费信息了
msgs, err := ch.Consume( //消费
q.Name, // queue //队列的名字
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
forever := make(chan bool)//在这里创建一个通道,防止main函数执行结束
go func() {
for d := range msgs {
log.Println("Received a message : ",string(d.Body))//tong
}
}()
<-forever
Runing
首先找到我们的消费者,在蓝色路径处输入cmd打开终端然后运行我们的receive.go
同理再去运行生产者,就可以看到我们的消费者已经接收到到了信息