列表项目

简介

基本的NATS服务器专为高性能和简洁而设计,它不会为通过NATS发布的消息提供持久性存储。缺乏对消息的持久存储对于许多分布式系统来说都是一个问题。例如,假设你的一个用户系统在你发布消息时发生故障,则该用户系统不会收到该消息,因此你必须提供处理此类情况的架构方法。

NATS Streaming附带一个持久性存储,用于为通过 NATS 服务器发布的消息提供日志。如果你需要持久性消息传输和交付保证,则可以使用 NATS Streaming 而不是核心 NATS 平台。
NATS Streaming是一个非常高性能,轻量级和可靠的流式平台,它构建在核心NATS平台的顶层,提供持久性日志。NATS Streaming 是用 Go 语言编写的。它可以用来添加事件流,交付保证和将历史数据重放到NATS。然而NATS Streaming 不是一个独立的服务器,但它使用 NATS 服务器(gnatsd)。简而言之,NATS Streaming 内嵌 NATS 服务器作为消息传输服务器,并提供了一个额外的功能,可以为事件流系统提供持久性日志使用。

安装

首先我们先使用go get获得NATS流

go get github.com/nats-io/nats-streaming-server

当我们下载好之后,打开我们的github.com/nats-io/nats-streaming-server包,在里面有个nats-streaming-server.go文件,把这个文件复制一份到我们go工作目录下的bin目录里


1.png


为了用 Go 语言创建 NATS 客户端应用程序,请使用_go get_命令下载并安装 Go 语言包:

go get github.com/nats-io/go-nats-streaming

要运行NATS流,请运行我们放在bin目录下的 _nats-streaming-server_二进制文件:

nats-streaming-server

默认情况下,NATS Streaming使用内存存储来存储消息,因此如果NATS服务器关闭,你将丢失消息。下面是我运行的结果:
2.png

发布事件

首先我们先在这里创建一个事件的发布者,nats streaming一般都是用在集群的发布订阅。

package main

import (
    "encoding/json"
    "log"
    //stan "github.com/nats-io/go-nats-streaming" //    如果这个包导入不成功就,导入下面的包
    "github.com/nats-io/stan.go"
)

const (
    clusterID = "test-cluster"       //集群ID:自定义        
    clientID  = "event-store1"       //客户端ID:自己这个发布者的ID,自定义
    channel   = "order-notification" //发布的频道:把自己 产生的事件要发布的频道ID    
)

//创建一个学生结构体
type student struct {
    Name   string `json:"name,omitempty"`
    Age    int    `json:"age,omitempty"`
    Gender string `json:"gender,omitempty"`
}

func main() {
    //建立nats streaing连接
    sc, err := stan.Connect(
        clusterID,
        clientID,
        stan.NatsURL(stan.DefaultNatsURL),    //这里为默认,
    )
    if err != nil {
        log.Print(err)
    }
    defer sc.Close()
    
    //创建实例
    var stu student
    stu.Name = "小明"
    stu.Age = 19
    stu.Gender = "男"
    
    //把结构体实例转化为json数据
    data, err := json.Marshal(stu)
    if err != nil {
        log.Println(err, "  nats-streaming 解析json数据失败")
    }
    //通过推送函数,把[]byte类型的json数据推送到该集群下的"order-notification"频道
    sc.Publish(channel, data)
}



订阅事件

基本的NATS服务器没有提供持久性日志,订阅消息的能力非常有限。当你发布消息时,如果订阅者客户端关闭,它将无法接收来自服务器的消息。由于NATS Streaming 服务器有持久化日志功能,它提供了很多从 NATS 服务器订阅消息的功能。在这里我们讲两种订阅方式:

队列组订阅

订阅者客户端可以通过指定一个_队列组_来创建。具有相同队列名称的同一频道的多个订户客户端形成队列组。队列订阅者可让你分发给多个订阅者消息处理。当你在频道上发布消息时,该消息将被发送到同一队列组中的一个用户。当你在短时间内发布数百万条消息时,如果消息处理的顺序并不重要,则排队订阅者可以高效地并行分发消息处理,并且提供高性能。

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "runtime"

    "github.com/nats-io/stan.go"
)

const (
    clusterID  = "test-cluster"
    clientID   = "order-query-store2"       //订阅者ID
    channel    = "order-notification"      //订阅的频道
    durableID  = "store-durable"           //队列成员持久化ID
    queueGroup = "order-query-store-group" //队列组ID
)

//创建一个学生结构体
type student struct {
    Name   string `json:"name,omitempty"`
    Age    int    `json:"age,omitempty"`
    Gender string `json:"gender,omitempty"`
}

func main() {
    //订阅者建立连接
    sc, err := stan.Connect(
        clusterID,
        clientID,    //这里只需改订阅者ID就可以作为一个新的订阅者
        stan.NatsURL(stan.DefaultNatsURL),
    )

    if err != nil {
        log.Println("OrderQuery :", err)
    }
    //该函数为队列组从频道里订阅接收消息,传入参数为订阅频道和自己要加入的队列组,如果队列组不存在
    //则创建队列组
    sc.QueueSubscribe(channel, queueGroup, func(msg *stan.Msg) {
        var stu student
        //订阅到的消息通过msg.Data获得,为[]byte数组类型
        err := json.Unmarshal(msg.Data, &stu)
        if err != nil {
            log.Println(err, "解析json数据失败")
        }
        fmt.Println(stu)
    }, stan.DurableName(durableID),
    )
    runtime.Goexit()
}

在这里我创建了两个订阅者组成一个队列组同时订阅同一个频道。
这里我 发布了四次事件,两个订阅者分别受到两条信息
3.png

订阅者client1:
4.png

订阅者client2:
5.png

持久化订阅

持久化订阅的意思是该订阅者会收到所有发布者所发布的事件,而上面的队列组则是队列组会收到所有的事件再把事件随机分给队列组内的成员

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "runtime"
    "time"

    "github.com/nats-io/stan.go"
)

const (
    clusterID = "test-cluster"
    clientID  = "order-query-store3"
    channel   = "order-notification"
    durableID = "store-durable"
)

//创建一个学生结构体
type student struct {
    Name   string `json:"name,omitempty"`
    Age    int    `json:"age,omitempty"`
    Gender string `json:"gender,omitempty"`
}

func main() {
    sc, err := stan.Connect(
        clusterID,
        clientID,
        stan.NatsURL(stan.DefaultNatsURL),
    )

    if err != nil {
        log.Println("PersistentSubscription :", err)
    }
    // 以手动ack模式订阅, 且设置 AckWait 时间至60s
    aw, _ := time.ParseDuration("60s")
    sc.Subscribe(channel, func(msg *stan.Msg) {
        msg.Ack() // Manual ACK
        var stu student
        err := json.Unmarshal(msg.Data, &stu)
        if err != nil {
            log.Println(err, "解析json数据失败")
        }
        fmt.Println(stu)
    }, stan.DurableName(durableID),
        stan.MaxInflight(25),
        stan.SetManualAckMode(),
        stan.AckWait(aw),
    )
    runtime.Goexit()
}


Last modification:April 17th, 2020 at 03:59 pm
如果觉得我的文章对你有用,请随意赞赏