简介

RabbitMQ是一个开源的,使用最广的消息转发器。提供匿名消息类型,可以使用多种消息协议,消息队列等,支持集群部署。
它的轻量级能够让你很轻易地在私有云或公有云环境中部署,同时提供丰富的插件工具,能够让你轻易的进行扩展,且可使用HTTP-API、命令行工具和UI界面以便于管理和监控。

专业术语

生产者:产生并发送消息的程序
队列:类似于一个存在RabbitMQ中的邮筒,虽然消息是在应用程序和RabbitMQ中进行传递,但队列才是唯一能够存储消息的的地方。队列的大小取决于寄主机器的内存和磁盘容量,他本质上是一个巨大的消息缓存池。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列中读取消息
消费者:等待接收消息的程序

Hello World

最开始接触消息队列,我们先来一个简单传输 Hello World的消息队列。其中包含两个小程序,一个是用于发送消息send.go, 一个用来接收消息的reveive.go

QQ截图20191212173336.png

在使用Go语言操作RabbitMQ之前我们需要先从github上拉取amqp包:

go get github.com/streadway/amqp

Sending

QQ截图20191212175620.png

首先我们要把需要用到的包导入我们的程序

package main

import (
    "fmt"
    "log"

    "github.com/streadway/amqp"
)

创建请求连接

在这里我们使用的消息队列将会基于TCP协议进行数据传输

//guest为消息队列网站注册的(可以修改),端口号默认为5672
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672")
if err!=nil{
        log.Println("conn err:",err)
}
defer conn.Close()

上面代码会建立一个socket连接,处理一些协议转换及版本对接和登录授权的问题。建立连接之后,我们需要创建一个通道channel,之后我们的大多数API操作都是围绕通道来实现的

创建通道

ch, err := conn.Channel()
if err!=nil{
        log.Println("create Channel err:",err)
}
defer ch.Close()

创建队列

通道创建完毕后,我们要在通道的基础上创建一个队列

q, err := ch.QueueDeclare(
        "hello", //通道名称
        false,   //持久的
        false,   //使用后是否删除
        false,   // 唯一性
        false,   //不允许等待
        nil,     //命令参数
    )

在这里q就为我们创建的队列,这个队列的名字叫做hello

生产

当我们创建好一个消息队列后就可以向消息队列中推送消息了

body:="hello worldd"
err = ch.Publish( //发布
        "",     //exchange                交换器名,这里为空
        q.Name, //routing key            路由 key,这里写消息队列的名字
        false,  //mandatory                托管的
        false,  //immediate                直接的,立即的
        amqp.Publishing{ //发布中
            ContentType: "text/plain", //内用类型
            Body:        []byte(body),    //消息队列传输时传的是byte字节型的切片
        },
    )

Receiving

QQ截图20191213103953.png

首先,我们还是要导入同样的包

package main

import (
    "fmt"
    "log"

    "github.com/streadway/amqp"
)

创建请求连接

接着与生产者一样创建连接,注意这里的参数必须与send中的queue name相一致,这样才能实现发送/接受的配对。

//guest为消息队列网站注册的(可以修改),端口号默认为5672
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672")
if err!=nil{
        log.Println("conn err:",err)
}
defer conn.Close()

创建通道

ch, err := conn.Channel()
if err!=nil{
        log.Println("create Channel err:",err)
}
defer ch.Close()

创建队列

通道创建完毕后,创建的队列里的属性必须要与生产者队列属性一致,不然会不匹配

q, err := ch.QueueDeclare(
        "hello", //通道名称
        false,   //持久的
        false,   //使用后是否删除
        false,   // 唯一性
        false,   //不允许等待
        nil,     //命令参数
    )

消费

紧接着我们就可以从队列里消费信息了

msgs, err := ch.Consume( //消费
        q.Name, // queue    //队列的名字
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // arguments
    )
forever := make(chan bool)//在这里创建一个通道,防止main函数执行结束
go func() {
    for d := range msgs {
        log.Println("Received a message : ",string(d.Body))//tong
    }
}()
<-forever

Runing

首先找到我们的消费者,在蓝色路径处输入cmd打开终端然后运行我们的receive.go
QQ截图20191213105857.png

同理再去运行生产者,就可以看到我们的消费者已经接收到到了信息
QQ截图20191213110451.png

Last modification:December 19th, 2019 at 10:15 am
如果觉得我的文章对你有用,请随意赞赏