go-nsq
The official Go package for NSQ.
Docs
See godoc and the main repo apps directory for examples of clients built using this package.
Tests
Tests are run via ./test.sh
(which requires nsqd
and nsqlookupd
to be installed).
The official Go package for NSQ.
See godoc and the main repo apps directory for examples of clients built using this package.
Tests are run via ./test.sh
(which requires nsqd
and nsqlookupd
to be installed).
We'd like to be able to override how a consumer queries from NSQLookupds. I think this approach would also allow others to switch NSQLookupd for etcd, zookeeper, etc.
This is still a WIP but I hope you can see where I'm going with this. Suggestions and Criticism welcome.
cc @oliver-bitly @jehiah @mreiferson
this does essentially the same structural changes as pynsq in bitly/pynsq#52
I'm guessing this is user error, but I'm getting a panic when stopping a consumer as of this commit: 4e74fa1f8933064a4f04786e65d3ee7b611be598. Reading the comment in there, I'm trying to understand what's going on.
My shutdown code looks something like this:
func (n *NSQPeer) Teardown() {
n.producer.Stop()
if n.consumer != nil {
n.consumer.DisconnectFromNSQD(n.host)
n.consumer.Stop()
<-n.consumer.StopChan
}
}
Here's the error:
runtime.panic(0x509600, 0x84c1d5)
/usr/local/Cellar/go/1.3/libexec/src/pkg/runtime/panic.c:279 +0xf5
github.com/bitly/go-nsq.(*Consumer).exit(0xc20800ef00)
/Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:1082 +0x2e
github.com/bitly/go-nsq.func·007()
/Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:990 +0x29
created by time.goFunc
/usr/local/Cellar/go/1.3/libexec/src/pkg/time/sleep.go:121 +0x47
Can anyone provide some insight on why I'm seeing this now?
nsqd --version nsqd v1.0.0-compat (built w/go1.8)
go-nsq version commit b9762cdcb6d5cc5ac5287ca076354143d332cc97 Tue Feb 14 16:13:23 2017 -0800
# go version go version go1.8 linux/amd64
http://127.0.0.1:4151/stats
[d.c2.cp#ephemeral/detect#ephemeral] (127.0.0.3:4150) heartbeat received
. but seems won't got any other nsq message.# curl -XGET http://127.0.0.1:4151/stats;
[[email protected] ptd]# curl -XGET http://127.0.0.1:4151/stats;
nsqd v1.0.0-compat (built w/go1.8)
start_time 2017-03-27T19:07:02+08:00
uptime 45m7.260003591s
Health: OK
[.raw.raw#ephemeral] depth: 0 be-depth: 0 msgs: 6656 e2e%:
[processor#ephemeral ] depth: 0 be-depth: 0 inflt: 0 def: 0 re-q: 0 timeout: 0 msgs: 6656 e2e%:
[V2 18-190 ] state: 3 inflt: 0 rdy: 1 fin: 6656 re-q: 0 msgs: 6656 connected: 8s
[d.c2.cp#ephemeral] depth: 10000 be-depth: 0 msgs: 791227 e2e%:
[d.c2.cp.done#ephemeral] depth: 0 be-depth: 0 msgs: 0 e2e%:
[processor#ephemeral ] depth: 0 be-depth: 0 inflt: 0 def: 0 re-q: 0 timeout: 0 msgs: 0 e2e%:
[V2 18-190 ] state: 3 inflt: 0 rdy: 1 fin: 0 re-q: 0 msgs: 0 connected: 8s
[d.c2.domain#ephemeral] depth: 10000 be-depth: 0 msgs: 208218 e2e%:
...
code snip
func nsqSubscribe(addr string, topic string, channel string, hdlr nsq.HandlerFunc) error {
consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
if err != nil {
print("new consumer error: ", err, "\n")
time.Sleep(1 * time.Second) //wait 1s
panic(err)
}
consumer.AddHandler(hdlr)
err = consumer.ConnectToNSQD(addr)
if err != nil {
print("connect nsqd error: ", err, "\n")
time.Sleep(1 * time.Second) //wait 1s
panic(err)
}
_ = <-consumer.StopChan
panic("nsq conn dead topic=" + topic + " channel=" + channel)
return nil
}
func main(){
producer, err := nsq.NewProducer(nsqConf.Local.Addr, nsq.NewConfig())
if err != nil {
panic(err)
}
go func() {
detector := new(c2.C2Sdk)
if detectorEnabled {
err = detector.Init()
if err != nil {
fmt.Println("Failed to init c2")
panic(err)
}
}
nsqSubscribe(nsqConf.Local.Addr, "d.c2.cp#ephemeral", "detect#ephemeral",
nsq.HandlerFunc(func(message *nsq.Message) error {
return handler_scan(message, detector,
producer, unmarshal_url, scan_url,
"d.c2.cp.done#ephemeral")
}))
}()
go func() {
detector := new(c2.C2Sdk)
if detectorEnabled {
err = detector.Init()
if err != nil {
fmt.Println("Failed to init c2")
panic(err)
}
}
nsqSubscribe(nsqConf.Local.Addr, "d.c2.url#ephemeral", "detect#ephemeral",
nsq.HandlerFunc(func(message *nsq.Message) error {
return handler_scan(message, detector,
producer, unmarshal_url, scan_url,
"d.c2.url.done#ephemeral")
}))
}()
.....
}
I'm currently investigating an issue where even after calling either DisconnectFromNSQLookupd
or DisconnectFromNSQD
on a host that is already disconnected, re-connection attempts still occur.
In our scenario we are consuming from 3 hosts: 10.0.0.1/10.0.0.2/10.0.0.3 and the host 10.0.0.1
becomes unhealthy (service stopped) we remove it from the list of nodes and call DisconnectFromNSQLookupd
or DisconnectFromNSQD
.
As you can see from the following log output, the connection to host 10.0.0.1
was lost and re-connection attempts are made. The host is then removed from our list and DisconnectFromNSQLookupd
is called. However, the re-connection attempts still occur:
2016-04-29 14:35:26 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:35:26 [DBG] IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false} (conn.go 328)
2016-04-29 14:35:29 [ERR] IO error - EOF (conn.go 471)
2016-04-29 14:35:29 [INF] beginning close (conn.go 611)
2016-04-29 14:35:29 [INF] readLoop exiting (conn.go 528)
2016-04-29 14:35:29 [INF] breaking out of writeLoop (conn.go 535)
2016-04-29 14:35:29 [INF] writeLoop exiting (conn.go 581)
2016-04-29 14:35:29 [INF] finished draining, cleanup exiting (conn.go 660)
2016-04-29 14:35:29 [INF] clean close complete (conn.go 668)
2016-04-29 14:35:29 [DBG] there are 2 connections left alive (consumer.go 741)
2016-04-29 14:35:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:35:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:35:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:35:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:35:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:35:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:35:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
2016-04-29 14:36:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:26 [DBG] querying nsqlookupd http://10.0.0.2:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
2016-04-29 14:37:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
2016-04-29 14:37:32 [INF] Disconnecting from NSQ hosts: [10.0.0.1:4150] (subscriber.go 176)
2016-04-29 14:37:32 [INF] Disconnecting from NSQ lookupds: [10.0.0.1:4161] (subscriber.go 187)
2016-04-29 14:37:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:38:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:38:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:38:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:38:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
2016-04-29 14:38:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:38:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:38:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
Is this intended/expected behaviour?
If a consumer is in backoff and attempts to recover with RDY 1 and then a message, but response with RequeueWithoutBackoff it will stall and not exit backoff. (failing test attached)
cc: @georgicodes
Hello, this PR provided support connections over unix sockets to nsqd.
According PR to nsq
Add MaxMsgSize to configuration mimicking the nsqd cofiguration key. It defaults to 1048576 as the nsqd config -- source: https://github.com/nsqio/nsq/blob/a4939964f6715edd27a6904b87c2f9eb6a45e749/nsqd/options.go#L130
Pass MaxMsgSize to ReadResponse via its caller, ReadUnpackedResponse. Check msgSize does not exceed the maximum in ReadResponse. This reduces the risk of attempting to read arbitrary (non-nsq) responses.
Generate custom error if msgSize is the result of deserializing the first 4 bytes of an HTTP response (1213486160) to facilitate troublehooting
Fixes #352
If we create a producer pointing to the HTTP address of nsqd by mistake (instead of using the TCP endpoint), the producer allocates over 1GB of memory before failing. This happens when the nsq protocol response size is extracted from the first 4 bytes of the tcp response data:
https://github.com/nsqio/go-nsq/blob/259dc590b2e791e4b11ad7496bc312ec81d1004b/protocol.go#L55
The first 4 bytes of HTTP/1.1 400 Bad Request
produce the int 1213486160
, which is used to create the buffer to hold the nsq response data:
https://github.com/nsqio/go-nsq/blob/259dc590b2e791e4b11ad7496bc312ec81d1004b/protocol.go#L64
And boom, another OOMKilled pod.
This code can be used to reproduce the issue. You'll need an instance nsqd running locally.
package main
import (
"log"
"runtime"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
p, err := nsq.NewProducer("localhost:4151", config)
if err != nil {
log.Fatalln("NewProducer:", err)
}
err = p.Publish("test-topic", []byte("test-message"))
printTotalAlloc()
if err != nil {
log.Fatalln("Publish:", err)
}
log.Println("Finished")
}
func printTotalAlloc() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Printf("TotalAlloc: %.3f MiB", float64(m.TotalAlloc)/1000/1000)
}
I guess the producer should somehow make sure that the nsq protocol is being used before trying to deserialize the data, but I'm not familiar with the nsq protocol.
Running the code above prints:
2023/01/12 13:47:51 INF 1 (localhost:4151) connecting to nsqd
2023/01/12 13:47:51 ERR 1 (localhost:4151) error connecting to nsqd - failed to IDENTIFY - unexpected EOF
2023/01/12 13:47:51 TotalAlloc: 1213.861 MiB
2023/01/12 13:47:51 Publish: failed to IDENTIFY - unexpected EOF
I'm using go-nsq v1.1.0 and Go 1.19.
Right now, in the event that nsqd addresses change, the old connections stay around in the consumer's connections map. This simply cleans up old connections from that map.
the nsq service is deployed on the public network. (106.75.49.135:4161, 106.75.49.135:4150),Can send and receive messages normally, but there will be 'error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused? '
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
command: /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4151:4151"
- "4150:4150"
2/10/12 11:21:15 INF 1 [Study/Chegg] (nsqd:4150) connecting to nsqd
2022/10/12 11:21:15 ERR 1 [Study/Chegg] (nsqd:4150) error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused
{"Address":["106.75.49.141:4150"],"level":"info","msg":"Receiving NSQ messages from addresses","time":"2022-10-12T11:21:15+08:00"}
send message:
sub message:
I have a system where I'm getting log messages from nsqd such as:
INFO: PROTOCOL(V2): [127.0.0.1:53376] exiting ioloop
ERROR: client(127.0.0.1:53376) - failed to read command - read tcp 1270.0.1:4150->127.0.0.1:53376: i/o timeout
INFO: PROTOCOL(v) [127.0.0.1:53376] exiting messagePump
From looking at the /stats
endpoint I can see after this message that the consumer is no longer connected (two producers are still connected fine). I can see that there were 2 messages in timeout which I presume is related to the disconnection of the consumer.
However, there is no indication that the consumer knows that it has lost its connection so there's no attempt to reconnect (or even just notify me that it's stopped). The StopChan
isn't being called and the Stats()
end point on the consumer is still giving a connection count of 1.
Is this the correct behaviour? Have I missed something around how to detect this i/o timeout situation? Unfortunately I can't share the source code, but happy to try things and report back.
Thanks
Andre
There should be no backward-incompatible changes, but there are a couple new API methods.
Upgrading from 1.0.7: There are no backward incompatible changes.
Thanks to @judwhite, @vitaliytv, and @HaraldNordgren for contributing to testing and dependency management improvements
Changes in this release
#249 update RDY when setting MaxInFlight to 0 #267 check response message size is positive (thanks @andyxning) #271 godoc for publisher and consumer (thanks @skateinmars) #270 set log level (thanks @YongHaoWu) #255 go vet tls.Config copying (thanks @iaburton)
Upgrading from 1.0.6: There are no backward incompatible changes.
Upgrading from 1.0.5: There are no backward incompatible changes.
DeferredPublish
(thanks @DanielHeckrath)Upgrading from 1.0.4: There are no backward incompatible changes.
flag.Value
interfacemax_backoff_duration
(thanks @judwhite)go-simplejson
dependencyUpgrading from 1.0.3: There are no backward incompatible changes.
ErrNotConnected
race during Producer
connection (thanks @jeddenlea)RDY
redistribution after backoff with no connectionsRequeueWithoutBackoff
BackoffStrategy
to be set via flag (thanks @twmb)BackoffStrategy
; add full-jitter strategy (thanks @hden)DialTimeout
and LocalAddr
config (thanks @yashkin)Producer.Ping()
method (thanks @zulily)Message
data races on responded