NATS
消息模式
当NATS作为发布-订阅引擎时,它提供了三种消息传递模式:
- 发布-订阅
- 队列
- 请求-响应
消息架构组件
- 消息:消息是数据交换单元,用于应用间交换数据的有效载荷。
- 主体:主体明确消息的目的。
- 生产者:生产者向NATS服务器发送消息。
- 消费者:消费者从NATS服务器中接收消息。
- 消息服务器:NATS服务器从生产者到消费者间分配消息。
发布订阅示例
无队列组模式
发布者
使用grpc服务器进行发布
package main
import (
"encoding/json"
"log"
"github.com/gogo/protobuf/proto"
nats "github.com/nats-io/nats.go"
uuid "github.com/satori/go.uuid"
pb "github.com/yuwe1/gopratice/nats/proto"
)
const (
aggregate = "Order"
event = "OrderCreated"
)
func publishOrderCreated(order *pb.Order, subject string) {
natsConnection, _ := nats.Connect(nats.DefaultURL)
log.Println("connected to " + nats.DefaultURL)
defer natsConnection.Close()
eventData, _ := json.Marshal(order)
event := pb.EventStore{
AggregateId: order.OrderId,
AggregateType: aggregate,
EventId: uuid.NewV4().String(),
EventType: event,
EventData: string(eventData),
}
data, _ := proto.Marshal(&event)
// Publish message on subject
natsConnection.Publish(subject, data)
log.Println("Published message on subject " + subject)
}
func main() {
publishOrderCreated(&pb.Order{OrderId: uuid.NewV4().String()}, "Order.OrderCreated")
}
订阅者
natsConnection.Subscribe(subject1, func(msg *nats.Msg) {
eventStore := pb.EventStore{}
err := proto.Unmarshal(msg.Data, &eventStore)
if err == nil {
// Handle the message
log.Printf("Received message in EventStore service: %+v\n", eventStore)
// store := pb.EventStore{}
// store.CreateEvent(&eventStore)
log.Println("Inserted event into Event Store")
}
// fmt.Println(err)
})
runtime.Goexit()
队列组模式
对于队列组模式,同一个对队列的订阅者,只能有一个会收到消息,仍使用之前的发布者,我们来看我们的订阅者
const (
queue = "Order.OrdersCreatedQueue"
subject1 = "Order.OrderCreated"
subject2 = "Order.OrderCreated"
)
func su3() {
// Create server connection
natsConnection, _ := nats.Connect(nats.DefaultURL)
log.Println("Connected to " + nats.DefaultURL)
// 第一个订阅者
natsConnection.QueueSubscribe(subject1, queue, func(msg *nats.Msg) {
eventStore := pb.EventStore{}
err := proto.Unmarshal(msg.Data, &eventStore)
if err == nil {
// Handle the message
log.Printf("Subscribed message in Worker 1: %+v\n", eventStore)
}
})
//第二个订阅者
natsConnection.QueueSubscribe(subject2, queue, func(msg *nats.Msg) {
eventStore := pb.EventStore{}
err := proto.Unmarshal(msg.Data, &eventStore)
if err == nil {
// Handle the message
log.Printf("Subscribed message in Worker 2: %+v\n", eventStore)
}
})
// Keep the connection alive
runtime.Goexit()
}