NATS

消息模式

当NATS作为发布-订阅引擎时,它提供了三种消息传递模式:

  • 发布-订阅
  • 队列
  • 请求-响应

消息架构组件

  • 消息:消息是数据交换单元,用于应用间交换数据的有效载荷。
  • 主体:主体明确消息的目的。
  • 生产者:生产者向NATS服务器发送消息。
  • 消费者:消费者从NATS服务器中接收消息。
  • 消息服务器:NATS服务器从生产者到消费者间分配消息。

 发布订阅示例

无队列组模式

发布者
使用grpc服务器进行发布

package main

import (
    "encoding/json"
    "log"

    "github.com/gogo/protobuf/proto"
    nats "github.com/nats-io/nats.go"
    uuid "github.com/satori/go.uuid"
    pb "github.com/yuwe1/gopratice/nats/proto"
)

const (
    aggregate = "Order"
    event     = "OrderCreated"
)

func publishOrderCreated(order *pb.Order, subject string) {
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    log.Println("connected to " + nats.DefaultURL)
    defer natsConnection.Close()
    eventData, _ := json.Marshal(order)
    event := pb.EventStore{
        AggregateId:   order.OrderId,
        AggregateType: aggregate,
        EventId:       uuid.NewV4().String(),
        EventType:     event,
        EventData:     string(eventData),
    }
    data, _ := proto.Marshal(&event)
    // Publish message on subject
    natsConnection.Publish(subject, data)
    log.Println("Published message on subject " + subject)

}
func main() {
    publishOrderCreated(&pb.Order{OrderId: uuid.NewV4().String()}, "Order.OrderCreated")
}

订阅者

natsConnection.Subscribe(subject1, func(msg *nats.Msg) {
        eventStore := pb.EventStore{}
        err := proto.Unmarshal(msg.Data, &eventStore)
        if err == nil {
            // Handle the message
            log.Printf("Received message in EventStore service: %+v\n", eventStore)
            // store := pb.EventStore{}

            // store.CreateEvent(&eventStore)
            log.Println("Inserted event into Event Store")
        }
        // fmt.Println(err)
    })
    runtime.Goexit()

队列组模式

对于队列组模式,同一个对队列的订阅者,只能有一个会收到消息,仍使用之前的发布者,我们来看我们的订阅者

const (
    queue    = "Order.OrdersCreatedQueue"
    subject1 = "Order.OrderCreated"
    subject2 = "Order.OrderCreated"
)


func su3() {
    // Create server connection
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    log.Println("Connected to " + nats.DefaultURL)
    
    // 第一个订阅者
    natsConnection.QueueSubscribe(subject1, queue, func(msg *nats.Msg) {
        eventStore := pb.EventStore{}
        err := proto.Unmarshal(msg.Data, &eventStore)
        if err == nil {
            // Handle the message
            log.Printf("Subscribed message in Worker 1: %+v\n", eventStore)
        }
    })
    //第二个订阅者
    natsConnection.QueueSubscribe(subject2, queue, func(msg *nats.Msg) {
        eventStore := pb.EventStore{}
        err := proto.Unmarshal(msg.Data, &eventStore)
        if err == nil {
            // Handle the message
            log.Printf("Subscribed message in Worker 2: %+v\n", eventStore)
        }
    })
    // Keep the connection alive
    runtime.Goexit()
}
Last modification:October 19th, 2019 at 04:37 pm
如果觉得我的文章对你有用,请随意赞赏