博客
关于我
Go之NSQ简介,原理和使用
阅读量:466 次
发布时间:2019-03-06

本文共 3392 字,大约阅读时间需要 11 分钟。

NSQ简介

NSQ 是 Go语言 编写的一个开源实时分布式内存消息队列,性能表现优异。它设计用于处理每天数以十亿计级别的消息,具备分布式和去中心化拓扑结构,具有无单点故障、容错、高可用性和可靠消息传递的特点。尽管适合小型项目使用,但在没有能力进行二次开发的情况下仍存在诸多问题。


NSQ优势

  • 分布式、去中心化:没有单点故障,支持容错和高可用性,确保消息可靠传递。
  • 横向扩展:无需集中代理,能够无缝添加更多节点。
  • 易于配置和部署:内置管理界面,部署简单。

  • NSQ特性

  • 无 SPOF 分布式拓扑:集群中不存在单点故障。
  • 水平扩展:无需中间件,直接添加节点。
  • 低延迟:性能优异,适合高吞吐量和高通量场景。
  • 多播路由:结合负载均衡和多播,消息分发均匀。
  • 面向流媒体和工作负载:擅长处理实时数据和批量请求。
  • 内存+磁盘存储:内存缓冲,超过阈值后持久化到磁盘。
  • 服务发现:通过 nsqlookupd 发现生产者。
  • 传输层安全:支持 TLS 加密。
  • 数据格式无关:客户端可自定义消息格式。
  • 依赖项简单:部署健全,默认配置有界。
  • 多语言支持:提供简单的 TCP 协议和客户端库。
  • 管理界面:通过 HTTP 接口提供统计和管理功能。
  • 集群管理:集成 statsd 实时监控,提供健壮的管理界面 nsqadmin。

  • NSQ应用场景

    应用解耦

    通过消息队列将业务逻辑解耦,降低系统耦合度。例如,后续业务可直接订阅订单数据流,提升系统灵活性。

    流量削峰

    类似秒杀等场景,使用消息队列缓冲大量请求,保障后端服务稳定性。


    NSQ架构

    模块介绍

  • nsqd:负责接收、排队和分发消息,作为核心服务运行。

    • 特性
      • 向 nsqlookupd 注册元数据。
      • 支持负载均衡,消息分发均匀。
      • 队列消息至少传递一次,防止丢失。
      • 可配置内存占用,超出部分持久化到磁盘。
  • nsqlookupd:管理拓扑信息,提供服务发现功能。

    • 特性
      • 唯一性:集群中仅一个 nsqlookupd 服务。
      • 去中心化:崩溃不影响 nsqd 运行。
      • 提供查询服务,定时更新地址目录。
  • nsqadmin:管理界面,展示数据并创建 topic 和 channel。

    • 特性
      • 界面简洁,操作简单。
      • 展示消息数量和实时监控数据。
      • 依赖 nsqlookupd,传递操作请求。

  • NSQ工作模式

    Topic 和 Channel

    • Topic:数据流的高层概念,负责区分不同类型数据。
    • Channel:Topic 的分支,消息分发到多个 channel,实现负载均衡。
    • 消息传递
      • Topic → Channel(消息复制到多个 channel)。
      • Channel → 消费者(消息均匀分发)。

    CentOS 安装 NSQ

    下载

    wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gztar xf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz -C /usr/local

    本地解析 hosts

    192.168.43.47 nsq-47

    启动

    # 开三个终端分别启动./nsqlookupd ./nsqd --lookupd-tcp-address=192.168.43.47:4160./nsqadmin --lookupd-http-address=192.168.43.47:4161

    Go 操作 NSQ

    安装客户端

    go get -u github.com/nsqio/go-nsq

    生产者示例

    package mainimport (    "bufio"    "fmt"    "os"    "strings"    "github.com/nsqio/go-nsq")func initProducer(addr string) (*nsq.Producer, error) {    config := nsq.NewConfig()    producer, err := nsq.NewProducer("topic_demo", addr, config)    if err != nil {        fmt.Printf("创建生产者失败,错误:%v\n", err)        return nil, err    }    return producer, nil}func main() {    nsqAddress := "192.168.43.47:4150"    producer, err := initProducer(nsqAddress)    if err != nil {        fmt.Printf("初始化生产者失败,错误:%v\n", err)        return    }    reader := bufio.NewReader(os.Stdin)    for {        data, err := reader.ReadString('\n')        if err != nil {            fmt.Printf("读取输入失败,错误:%v\n", err)            continue        }        data = strings.TrimSpace(data)        if strings.ToUpper(data) == "Q" {            break        }        if err := producer.Publish("topic_demo", []byte(data)); err != nil {            fmt.Printf("发布消息失败,错误:%v\n", err)            continue        }    }}

    消费者示例

    package mainimport (    "fmt"    "os"    "os/signal"    "syscall"    "time"    "github.com/nsqio/go-nsq")type MyHandler struct {    Title string}func (m *MyHandler) HandleMessage(msg *nsq.Message) error {    fmt.Printf("%s 收到消息,地址:%v,内容:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))    return nil}func initConsumer(topic, channel, addr string) (*nsq.Consumer, error) {    config := nsq.NewConfig()    config.LookupdPollInterval = 15 * time.Second    consumer, err := nsq.NewConsumer(topic, channel, config)    if err != nil {        fmt.Printf("创建消费者失败,错误:%v\n", err)        return nil, err    }    consumer.AddHandler(&MyHandler{        Title: "沙河1号",    })    return consumer, nil}func main() {    err := initConsumer("topic_demo", "first", "192.168.43.47:4161")    if err != nil {        fmt.Printf("初始化消费者失败,错误:%v\n", err)        return    }    c := make(chan os.Signal)    signal.Notify(c, syscall.SIGINT)    <-c}

    转载地址:http://msabz.baihongyu.com/

    你可能感兴趣的文章
    Node.js初体验
    查看>>
    Node.js升级工具n
    查看>>
    Node.js卸载超详细步骤(附图文讲解)
    查看>>
    Node.js卸载超详细步骤(附图文讲解)
    查看>>
    Node.js基于Express框架搭建一个简单的注册登录Web功能
    查看>>
    node.js学习之npm 入门 —8.《怎样创建,发布,升级你的npm,node模块》
    查看>>
    Node.js安装与配置指南:轻松启航您的JavaScript服务器之旅
    查看>>
    Node.js安装及环境配置之Windows篇
    查看>>
    Node.js安装和入门 - 2行代码让你能够启动一个Server
    查看>>
    node.js安装方法
    查看>>
    Node.js官网无法正常访问时安装NodeJS的方法
    查看>>
    node.js模块、包
    查看>>
    node.js的express框架用法(一)
    查看>>
    Node.js的交互式解释器(REPL)
    查看>>
    Node.js的循环与异步问题
    查看>>
    Node.js高级编程:用Javascript构建可伸缩应用(1)1.1 介绍和安装-安装Node
    查看>>
    nodejs + socket.io 同时使用http 和 https
    查看>>
    NodeJS @kubernetes/client-node连接到kubernetes集群的方法
    查看>>
    NodeJS API简介
    查看>>
    Nodejs express 获取url参数,post参数的三种方式
    查看>>