本文共 3392 字,大约阅读时间需要 11 分钟。
NSQ 是 Go语言 编写的一个开源实时分布式内存消息队列,性能表现优异。它设计用于处理每天数以十亿计级别的消息,具备分布式和去中心化拓扑结构,具有无单点故障、容错、高可用性和可靠消息传递的特点。尽管适合小型项目使用,但在没有能力进行二次开发的情况下仍存在诸多问题。
通过消息队列将业务逻辑解耦,降低系统耦合度。例如,后续业务可直接订阅订单数据流,提升系统灵活性。
类似秒杀等场景,使用消息队列缓冲大量请求,保障后端服务稳定性。
nsqd:负责接收、排队和分发消息,作为核心服务运行。
nsqlookupd:管理拓扑信息,提供服务发现功能。
nsqadmin:管理界面,展示数据并创建 topic 和 channel。
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gztar xf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz -C /usr/local
192.168.43.47 nsq-47
# 开三个终端分别启动./nsqlookupd ./nsqd --lookupd-tcp-address=192.168.43.47:4160./nsqadmin --lookupd-http-address=192.168.43.47:4161
go get -u github.com/nsqio/go-nsq
package mainimport ( "bufio" "fmt" "os" "strings" "github.com/nsqio/go-nsq")func initProducer(addr string) (*nsq.Producer, error) { config := nsq.NewConfig() producer, err := nsq.NewProducer("topic_demo", addr, config) if err != nil { fmt.Printf("创建生产者失败,错误:%v\n", err) return nil, err } return producer, nil}func main() { nsqAddress := "192.168.43.47:4150" producer, err := initProducer(nsqAddress) if err != nil { fmt.Printf("初始化生产者失败,错误:%v\n", err) return } reader := bufio.NewReader(os.Stdin) for { data, err := reader.ReadString('\n') if err != nil { fmt.Printf("读取输入失败,错误:%v\n", err) continue } data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q" { break } if err := producer.Publish("topic_demo", []byte(data)); err != nil { fmt.Printf("发布消息失败,错误:%v\n", err) continue } }} package mainimport ( "fmt" "os" "os/signal" "syscall" "time" "github.com/nsqio/go-nsq")type MyHandler struct { Title string}func (m *MyHandler) HandleMessage(msg *nsq.Message) error { fmt.Printf("%s 收到消息,地址:%v,内容:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return nil}func initConsumer(topic, channel, addr string) (*nsq.Consumer, error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second consumer, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("创建消费者失败,错误:%v\n", err) return nil, err } consumer.AddHandler(&MyHandler{ Title: "沙河1号", }) return consumer, nil}func main() { err := initConsumer("topic_demo", "first", "192.168.43.47:4161") if err != nil { fmt.Printf("初始化消费者失败,错误:%v\n", err) return } c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT) <-c} 转载地址:http://msabz.baihongyu.com/