NSQ是一个基于Go语言的分布式实时消息平台,当前最新版本v1.2.0,可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ 具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者兴趣构建自己的客户端的话,还可以参考官方提供的协议规范

  • 分布式 Distributed
  • 可扩展 Scalable
  • 更友好的运维 Ops Friendly
  • 集成Go、Python库更方便 Integrated

NSQ 是由四个重要组件构成

  • nsqd :一个负责接收、排队、转发消息到客户端的守护进程
  • nsqlookupd :管理拓扑信息并提供最终一致性的发现服务的守护进程
  • nsqadmin :一套 Web 用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
  • utilities :常见基础功能、数据流处理工具,如 nsq_statnsq_tailnsq_to_filensq_to_httpnsq_to_nsqto_nsq

NSQ 的主要特点如下

  • 具有分布式且无单点故障的拓扑结构 支持水平扩展,在无中断情况下能够无缝地添加集群节点
  • 低延迟的消息推送,参见官方提供的性能说明文档
  • 具有组合式的负载均衡和多播形式的消息路由
  • 既擅长处理面向流(高吞吐量)的工作负载,也擅长处理面向 Job 的(低吞吐量)工作负载
  • 消息数据既可以存储于内存中,也可以存储在磁盘中
  • 实现了生产者、消费者自动发现和消费者自动连接生产者,参见 nsqlookupd
  • 支持安全传输层协议(TLS),从而确保了消息传递的安全性
  • 具有与数据格式无关的消息结构,支持 JSON、Protocol Buffers、MsgPacek 等消息格式
  • 非常易于部署(几乎没有依赖)和配置(所有参数都可以通过命令行进行配置)
  • 使用了简单的 TCP 协议且具有多种语言的客户端功能库
  • 具有用于信息统计、管理员操作和实现生产者等的 HTTP 接口
  • 为实时检测集成了统计数据收集器 StatsD
  • 具有强大的集群管理界面,参见 nsqadmin

为了达到高效的分布式消息服务,NSQ 实现了合理、智能的权衡,从而使得其能够完全适用于生产环境中,具体内容如下:

  • 支持消息内存队列的大小设置,默认完全持久化(值为 0),消息即可持久到磁盘也可以保存在内存中
  • 保证消息至少传递一次, 以确保消息可以最终成功发送
  • 收到的消息是无序的, 实现了松散订购
  • 发现服务 nsqlookupd 具有最终一致性, 消息最终能够找到所有 Topic 生产者

官方和第三方还为 NSQ 开发了众多客户端功能库,如官方提供的基于 HTTP 的 nsqd 、Go 客户端 go-nsq 、Python 客户端 pynsq 、基于 Node.js 的 JavaScript 客户端 nsqjs 、异步 C 客户端 libnsq 、Java 客户端 nsq-java 以及基于各种语言的众多第三方客户端功能库。更多客户端功能库, 请读者点击这里查看。

从 NSQ 的设计文档中得知,单个nsqd 被设计为一次能够处理多个流数据,NSQ 中的数据流模型是由stream 和consumer 组成。Topic 是一种独特的stream,Channel 是一个订阅了给定Topic 的consumer 逻辑分组。NSQ 的数据流模型结构如下图所示:

NSQ:分布式的实时消息平台

从上图可以看出,单个 nsqd 可以有多个 Topic,每个 Topic 又可以有多个 Channel。Channel 能够接收 Topic 所有消息的副本,从而实现了消息多播分发;而 Channel 上的每个消息被分发给它的订阅者,从而实现负载均衡,所有这些就组成了一个可以表示各种简单和复杂拓扑结构的强大框架。

NSQ实践

NSQ实践

我们要先启动nsqlookupd,为了演示方便,我启动两个nsqlookupd实例, 三个nsqd实例

./nsqlookupd -tcp-address ":8200" -http-address ":8201"
./nsqlookupd -tcp-address ":7200" -http-address ":7201"

为了演示横向扩充,先启动两个,客户端连接后,再启动第三个。

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
./nsqd -tcp-address ":7000"  -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200  -data-path=./b

--lookupd-tcp-address 用于指定lookup的连接地址

客户端代码如下:

package main

import (
	"fmt"
	nsq "github.com/nsqio/go-nsq"
	"os"
	"os/signal"

)

func main() {
	adds := []string{"10.10.75.125:7201", "10.10.75.125:8201"}
	config := nsq.NewConfig()

	topicName := "testTopic1"
	c, _ := nsq.NewConsumer(topicName, "ch1", config)
	testHandler := &MyTestHandler{consumer: c}

	c.AddHandler(testHandler)
	if err := c.ConnectToNSQLookupds(adds); err != nil {
		panic(err)
	}
	stats := c.Stats()
	if stats.Connections == 0 {
		panic("stats report 0 connections (should be > 0)")
	}
	stop := make(chan os.Signal)
	signal.Notify(stop, os.Interrupt)
	fmt.Println("server is running....")
	<-stop
}

type MyTestHandler struct {
	consumer *nsq.Consumer
}

func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
	fmt.Println(string(message.Body))
	return nil
}

方法ConnectToNSQLookupds就是用于连接nsqlookupd的,但是需要注意的是,连接的是http端口72018201,库go-nsq 是通过请求其中一个nsqlookupd的 http 方法http://127.0.0.1:7201/lookup?topic=testTopic1 来得到所有提供topic=testTopic1nsqd 列表信息,然后对所有的nsqd进行连接,

2019/08/30 13:47:26 INF    1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 13:47:26 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:7000) connecting to nsqd
2019/08/30 13:47:26 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) connecting to nsqd

目前我们已经连接了两个。 我们演示一下橫向扩充,启动第三个nsqd

./nsqd -tcp-address ":6000"  -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200  -data-path=./c

部署一个nsqadmin,访问127.0.0.1:4171即可查看nsq管理后台

./nsqadmin  --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201

去中心化实现原理

nsqlookupd用于管理整个网络拓扑结构,nsqd用他实现服务的注册,客户端使用他得到所有的nsqd服务节点信息,然后所有的consumer端连接 实现原理如下,

  • nsqd把自己的服务信息广播给一个或者多个nsqlookupd
  • 客户端连接一个或者多个nsqlookupd,通过nsqlookupd得到所有的nsqd的连接信息,进行连接消费,
  • 如果某个nsqd出现问题,down机了,会和nsqlookupd断开,这样客户端nsqlookupd得到的nsqd的列表永远是可用的。客户端连接的是所有的nsqd,一个出问题了就用其他的连接,所以也不会受影响。