01 — 概述 NSQ 组件: 1、nsqd:接受、排队、传递消息的守护进程,消息队列中的核心。...2、NSQ 保证消息至少被传递一次,但也有可能极端情况下会被传递多次,消费者需要额外注意这一点。 3、消息是无序的。...7、nsq 没有复杂的路由,没有 replication 副本备份。 总而言之,NSQ 高效轻量、简单、易于分布式扩展。...另外有赞团队自己改造了一版 NSQ 并开源了出来( https://github.com/youzan/nsq ),视频: https://www.youtube.com/watch?
在谈到消息队列时,除了 Kafka、RabbitMQ、RocketMQ、ActiveMQ 等等之外,我希望你多了解一下 NSQ,之前已经写过一篇文章 《 NSQ 概述 》,但是内容过于简单,现在再多写一点...NSQ 相关的内容。...2 Timeout 每一条消息都必须在一定时间内向 nsq 做出响应,否则 nsq 会认为这条消息超时,然后 requeue 处理。...3 Touch 有时候 consumer 需要更长的时间来对消息进行处理,而不想被 nsq 判定超时然后 requeue ,这时候就可以主动向 nsq 响应 Touch ,表示消息是正常处理的,但是需要更长时间...,nsq 接受到 Touch 响应后就会刷新这条消息的超时时间。
自己创建的消费者只要有HandleMessage方法就可以 package main import ( "fmt" "time" "github.com/nsqio/go-nsq..." ) // nsq发布消息 func Producer() { p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())...= nil { panic(err) } if err := p.Publish("test", []byte("hello NSQ!!!"))...() { c, err := nsq.NewConsumer("test", "test-channel", nsq.NewConfig()) // 新建一个消费者 if err !...nsq admin 参数说明 nsq admin 中显示的那些参数,可以在这里进行查看 http://nsq.io/components/nsqadmin.html#metrics ##参考 go-nsq
为了充分支持自研版 NSQ 新功能,在要构建 NSQ client 时,需要在兼容原版 NSQ 的基础上,实现额外的设计。...nsq 的新特性,对 nsq 文档[1]中构建 nsq client 的专题进行补充。...2.1 配置 client 自研版 NSQ 改造自开源版 NSQ,继承了开源版 NSQ 中的配置。[^1]中 Configuration 段落的内容适用于有赞自研版。...nsq client 负责调用 nsqlookupd 的 lookup 服务,并通过 poll,定时更新消息在 nsq 集群上的读写分布信息。...七、消息追踪功能的实现 新版 NSQ 通过在消息 ID 中增加 TraceID,对消息在 NSQ 中的生命周期进行追踪,client 通过 PUBTRACE 新命令将需要追踪的消息发送到 NSQ,PUB_TRACE
官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端...4.Integrated高度集成 官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。...nsq_to _nsq:消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。...NSQ基本没有配置文件,配置通过命令行指定参数。...参考 NSQ:分布式的实时消息平台 NSQ - NYC Golang Meetup NSQ Docs
用一了段时间NSQ还是很稳定的。除了稳定,还有一个特别值的说的就是部署非常简单。总想写点什么推荐给大家使用nsq来做一些东西。但是就是因为他太简单易用,文档也比较简单易懂。...nsq官网: http://nsq.io/ ? 为了容灾需要对nsqd多机器部属,有了Docker后,快速扩还是很方便的。 部署完后我会用go和c#写一些代码方便大家学习。 ...我用go语言 简单写一个发送信息的例子: go使用的库是 go-nsq 地址 : github.com/nsqio/go-nsq func main() { config := nsq.NewConfig...() // 随便给哪个ip发都可以 //w1, _ := nsq.NewProducer("192.168.0.105:4150", config) w1, _ := nsq.NewProducer...msgGood int } var consumers []*nsq.Consumer = make([]*nsq.Consumer, 0) var mux *sync.Mutex = &sync.Mutex
看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的 认识。...对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责从lookupd中获取到 指定nsqd之后,从nsqd中获取消息。...生产者 我们以nsq/apps/to_nsq/to_nsq.go为例,客户端这边的代码逻辑就简单很多,NewProducer实例化一个instance,publish消息 到nsqd。.../// nsq/apps/to_nsq/to_nsq.go producer, err := nsq.NewProducer(addr, cfg) err := producer.Publish(*topic...消费者 回过头我们再来看下消费者部分的代码,client端我们以nsq/apps/nsq_tail/nsq_tail.go为例,代码的基本逻辑如下: // 1. new comsunmer instanace
;cd nsq 安装依赖包: gpm install 安装NSQ: go install ./......三 开启NSQ: nsqd节点维护进程:nsqlookupd & nsqd节点进程:nsqd --lookupd-tcp-address=127.0.0.1:4160 & 消息产看进程:nsqadmin...nsq发布消息 func Producer() { p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())...= nil { panic(err) } if err := p.Publish("test", []byte("hello NSQ!!!")); err !...() { c, err := nsq.NewConsumer("test", "test-channel", nsq.NewConfig()) // 新建一个消费者 if err !
我们在前面介绍了 nsq 的相关概念以及 nsq 的安装与应用以及 nsqd 的实现原理、nsqlookupd 的实现细节。 本文将会介绍 nsq 在设计方面的一些思路。 ?...在可靠性、有序性方便, nsq 保证消息至少被投递消费一次(幂等消费),当某个 nsqd 节点出现故障时,极端情况下内存里面的消息还未来得及存入磁盘,这部分消息将丢失;通过分布式多个 consumer...多路分发 - producer 会同时连上 nsq 集群中所有 nsqd 节点,当然这些节点的地址是在初始化时,通过外界传递进去;当发布消息时,producer 会随机选择一个 nsqd 节点发布某个...只有一个客户端,那么 Channel 就将消息投递给这个客户端;如果 Channel 的客户端不止一个,那么 Channel 将把消息随机投递给任何一个客户端,这也可以看做是客户端的负载均衡 小结 本文主要介绍 nsq
;cd nsq 安装依赖包: gpm install 安装NSQ: go install ./......三 开启NSQ: nsqd节点维护进程:nsqlookupd & nsqd节点进程:nsqd --lookupd-tcp-address=127.0.0.1:4160 & 消息产看进程:nsqadmin.../tmp目录下的一个文件中 五 代码测试: package main import ( "fmt" "time" "github.com/nsqio/go-nsq" ) // nsq...发布消息 func Producer() { p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())...() { c, err := nsq.NewConsumer("test", "test-channel", nsq.NewConfig()) // 新建一个消费者 if err !
nsqd-http-address 127.0.0.1:4151 producer package main import ( "fmt" "github.com/nsqio/go-nsq...(url, nsq.NewConfig()) if err !..." ) func main() { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second...//设置重连时间 c, err := nsq.NewConsumer("test", "test-channel", cfg) // 新建一个消费者 if err !...= nil { log.Fatal(err) } c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error
NSQ 支持跨平台和多语言客户端。使用 Mac 的读者朋友们可以使用 brew 方便的安装 NSQ。本文我们主要介绍 NSQ 官方提供的 golang 客户端 go-nsq。...关于 NSQ 的更多内容,感兴趣的读者朋友们可以查阅官方文档,限于篇幅,本文不再赘述。 使用 go-nsq 操作 NSQ,需要安装 go-nsq 库,它提供了许多用于操作 NSQ 的函数和方法。...安装方式: // Mac 安装 nsq brew install nsq // 安装 go-nsq go get -u github.com/nsqio/go-nsq 02 生产者 go-nsq 包中的...它是 NSQ 官方提供的 NSQ Golang 客户端。并且分别介绍了 Producer 和 Consumer 的简单使用方法,大家可以根据自己的业务需求,使用 go-nsq 灵活运用 NSQ。.../go-nsq@v1.0.8
nsq 最初是由 bitly 公司开源出来的一款简单易用的分布式消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。 它具有以下特性: 分布式。...现在已经有官方的Golang、Python和JavaScript客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。 1..../go-nsq" ) func main() { config := nsq.NewConfig() p, err := nsq.NewProducer("127.0.0.1:4150", config...) { wg := &sync.WaitGroup{} wg.Add(1000) config := nsq.NewConfig() c, _ := nsq.NewConsumer("...testTopic", "ch", config) c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { log.Printf
NSQ简介 NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。 NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。...NSQ优势 /* 1. NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性, 并提供可靠的消息交付保证. 2. NSQ支持横向扩展,没有任何集中式代理 3....消息不保证有序. */ NSQ应用场景 ?...Centos安装NSQ 下载 wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz...Go操作NSQ 安装go客户端 /* go get -u github.com/nsqio/go-nsq */ 生产者 // nsq_producer/main.go package main import
nsq.io/deployment/installing.html 下面以 nsq-1.2.1.linux-amd64.go1.16.6.tar.gz 为例说明。...:4161 启动脚本 cd /usr/local/nsq/bin NSQ_ADDRESS="127.0.0.1" NSQLOOKUPD_LOG="/home/wwwlogs/nsq/tmplookup5.../nsqlookupd -broadcast-address=$NSQ_ADDRESS -http-address=$NSQ_ADDRESS":4161" -tcp-address=$NSQ_ADDRESS.../nsqd --lookupd-tcp-address=$NSQ_ADDRESS":4160" -broadcast-address=$NSQ_ADDRESS -tcp-address=$NSQ_ADDRESS...}" >&2 exit 1 esac 将此脚本保存到 /etc/init.d/nsq 下,即可使用 /etc/init.d/nsq start|stop 命令启动和关闭。
NSQ是什么? 简介 NSQ 是一个实时分布式消息平台,旨在大规模运行,每天处理数十亿条消息。 它提倡没有单点故障的分布式和分散式拓扑结构,实现容错和高可用性,同时保证可靠的消息传递。...在操作上,NSQ很容易配置和部署(所有参数都在命令行上指定,编译的二进制文件没有运行时的依赖性)。...官方的Go和Python库是开箱即用的(还有许多其他的客户端库),如果你有兴趣建立自己的库,有一个协议规范。 架构: ? 监控界面: ? 无单点故障: ?...p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd ?...最后 NSQ部署真的挺简单....接下来整相关整合...
作者 | 陌无崖 转载请联系授权 介绍 NSQ是一个基于Go语言的分布式实时消息平台,可以大规模的运用于实时消息服务,每天可以处理数亿级别的消息,设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构...NSQ核心组件 nsqlookupd 用于守护进程负责管理拓扑信息,它使用tcp(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。...Nsq服务端与客户端的关系 消费者 1、消费者直接连接nsqd,缺点是服务无法实现动态伸缩(可以自己实现)。...安装 docker部署 docker-compose.yml version: '3' services: nsqlookupd: image: nsqio/nsq command:.../nsqlookupd ports: - "4160" - "4161" nsqd: image: nsqio/nsq command: /nsqd
上一篇初识了 nsq 三个模块(nsqd, nsqlookupd, nsqadmin)的 demo演示,本篇则从源码开始,一步一步去解析 nsqd 的执行流程和逻辑处理,学习别人优秀的项目架构,以期学以致用...1. nsqd 执行入口 在 nsq/apps/nsqd/main.go 可以找到执行入口文件,如下: 2. nsqd 执行主逻辑源码 2.1 通过第三方 svc 包进行优雅的后台进程管理,svc.Run
上一篇 介绍了 nsqd 的代码逻辑与流程图,本篇来解析 nsq 中另一大模块 nsqlookupd,其负责维护 nsqd 节点的拓扑结构信息,实现了去中心化的服务注册与发现。...1. nsqlookupd 执行入口 在 nsq/apps/nsqlookupd/main.go 可以找到执行入口文件,如下: 2. nsqlookupd 执行主逻辑 主要流程与上一篇讲的 nsqd
本文主要介绍了 NSQ 双机房以及多机房设计以及经验总结。 二、场景和需求 下图是一个机房内基本的 NSQ 消息生产和消费的部署。一个机房内生产者往 NSQ 集群发消息,多个消费者订阅消息。 ?...三、NSQ 双机房设计 我们结合 NSQ 中的服务发现组件 nsqlookupd 的功能实现 NSQ 的双机房功能。...一旦本地机房 NSQ 无法正常服务,已经落盘的消息不会丢失(恢复前无法被消费)。 根据代理的路由配置,NSQ 的双机房方案经历了两个阶段。...3.1 NSQ 双机房方案一期 NSQ 的双机房方案目前计划分为两期,一期中读写流量全部通过 migrate 导到一个机房,对端机房中的 NSQ 集群作为冷备。...{ "lookupSchema": { "nsq1": "this.is.url.of.nsq1:4161", "nsq2": "this.is.url.of.nsq2:4161", "nsq3
领取专属 10元无门槛券
手把手带您无忧上云