Skip to the content.

asyncnsq

Downloads PyPI version Python Version codecov

async nsq with asyncio

if you dont like the pynsq(which use tornado) way to interact with nsq, then this library may be suitable for you

you can use this library as the common way to write things


User Documents

Documents

Install

This project uses uv for environment and dependency management. The uv index is configured in uv.toml to use Tencent’s PyPI mirror.

uv sync

Usage examples

Consumer:

import asyncio

from asyncnsq import create_reader


async def main():
    reader = await create_reader(
        nsqd_tcp_addresses=["127.0.0.1:4150"],
        max_in_flight=200,
    )
    await reader.subscribe("test_async_nsq", "nsq")
    try:
        async for message in reader.messages():
            print(message.body)
            await message.fin()
    finally:
        await reader.graceful_close()


asyncio.run(main())

Producer:

import asyncio

from asyncnsq import create_writer


async def main():
    writer = await create_writer(
        host="127.0.0.1",
        port=4150,
        heartbeat_interval=30000,
        feature_negotiation=True,
        tls_v1=True,
        snappy=False,
        deflate=False,
        deflate_level=0,
    )
    for i in range(100):
        await writer.pub("test_async_nsq", f"test_async_nsq:{i}")
        await writer.dpub("test_async_nsq", i * 1000, f"delay:{i}")
    writer.close()


asyncio.run(main())

Requirements

Release 2.0.0

2.0.0 is the Python 3.12+ modernization release. It intentionally raises the baseline and cleans up old client behavior instead of preserving every Python 3.6-era compatibility detail.

Highlights:

Release tag: 2.0.0

Running Tests

uv sync
./test_service/start.sh
uv run python -m pytest --cov=asyncnsq --cov-report=term-missing
./test_service/stop.sh

Without the Docker cluster, integration tests that need NSQ ports are skipped and the unit test suite still runs.

For consumers, prefer await reader.graceful_close() during shutdown. It sends RDY 0, requeues queued and in-flight unfinished messages with REQ, then closes the TCP connections. reader.close() remains an immediate close.

set_message_handler() uses the internal asyncio.Queue by default. The TCP reader task only receives and enqueues messages, while a handler worker drains the queue and FIN/REQs each message:

def handle(message):
    process(message.body)

reader.set_message_handler(handle, auto_fin=True)
await reader.subscribe("topic", "channel")

CPU-bound synchronous handlers still occupy the Python event loop while they run; use multiple consumer processes when you need real multi-core processing.

For very small, fast handlers where the queue overhead matters, opt into direct handler mode. It bypasses the queue and can FIN messages on the TCP fast path:

reader.set_message_handler(handle, auto_fin=True, direct=True)

Benchmark

The benchmark suite is designed for release checks and local regression testing: it is deterministic enough for repeated local runs, fast by default, and strict enough to fail when the client loses messages, duplicates messages, times out, or fails graceful shutdown recovery.

It benchmarks the surfaces that matter for an NSQ client under high throughput:

One-command local benchmark:

uv sync
./test_service/benchmark.sh --profile pr --markdown benchmark.md

Run against an already running cluster:

uv run asyncnsq-benchmark --profile pr --markdown benchmark.md

Compare against the official Go client on the same local NSQ cluster:

PROFILE=pr ./test_service/benchmark_compare_go.sh

For single-core fairness, limit the Go scheduler to one OS thread:

GO_MAX_PROCS=1 PROFILE=pr ./test_service/benchmark_compare_go.sh

This writes:

The Go baseline uses github.com/nsqio/go-nsq v1.1.0 and runs the comparable PUB, MPUB, and end-to-end consume/finish scenarios. The asyncnsq benchmark also includes the Python-specific graceful shutdown requeue proof. Go runs as one process, but goroutines can execute on multiple cores unless GOMAXPROCS is limited; the Go report records the effective GOMAXPROCS value.

To measure Python scale-out across CPU cores, run the asyncnsq benchmark with multiple consumer processes. Each process owns its own asyncio loop and joins the same NSQ topic/channel, so NSQ distributes messages across them:

PROFILE=pr ./test_service/benchmark_compare_go.sh --consumer-processes 4

Latest local results

These numbers were measured for the 2.0 release line on 2026-05-15 with Python 3.12.4, NSQ nsqio/nsq:v1.3.0, a local Docker three-node nsqd cluster, 10,000 messages, 512 B payloads, concurrency 256, max_in_flight=1024, and output_buffer_timeout=25ms. Treat them as a same-machine reference, not a portable guarantee.

Current asyncnsq benchmark:

Scenario Messages msg/s MiB/s p50 ms p95 ms p99 ms Errors Notes
TCP PUB ack 10,000 82,918.92 40.49 2.859 3.633 11.704 0 per-message publish ACK latency
TCP MPUB batch ack 10,000 774,507.49 378.18 6.451 9.219 9.582 0 ACK latency measured per MPUB batch
end-to-end pub->fin 10,000 36,381.65 17.76 4.912 10.683 11.912 0 missing=0, duplicates=0, fin_errors=0
graceful close requeue 512 13,096.60 6.39 14.580 16.847 16.947 0 requeued=512, recovered=512

Handler dispatch comparison after making queue-backed handlers the default:

Consumer mode Messages msg/s p95 ms Errors Notes
direct=True handler 10,000 36,469.93 10.000 0 TCP reader calls the handler directly
default queue-backed handler 10,000 36,310.22 12.778 0 TCP reader enqueues; handler worker drains the queue

The queue-backed path kept throughput within about 0.5% of the direct handler path in this run. It is now the default because it keeps socket reading and user handler execution decoupled while MessageTracker separately tracks unfinished in-flight messages for safe FIN/REQ and graceful shutdown.

asyncnsq vs official go-nsq, default Go scheduler:

Scenario asyncnsq msg/s asyncnsq p95 ms go-nsq msg/s go-nsq p95 ms Errors
TCP PUB ack 82,793.09 3.373 191,773.18 1.914 0 / 0
TCP MPUB batch ack 601,022.00 15.343 645,822.53 12.322 0 / 0
end-to-end pub->fin 36,329.07 10.256 134,190.70 3.258 0 / 0

In that run, Go reported GOMAXPROCS=16. This is the normal production shape for the Go client, but it is not a single-core comparison against one Python asyncio process.

asyncnsq vs official go-nsq with Go limited to one OS thread:

Scenario asyncnsq msg/s asyncnsq p95 ms go-nsq msg/s go-nsq p95 ms Errors
TCP PUB ack 79,446.79 3.946 90,948.43 3.731 0 / 0
TCP MPUB batch ack 589,566.88 15.812 531,962.03 15.317 0 / 0
end-to-end pub->fin 36,390.14 9.908 50,251.00 89.039 0 / 0

The single-thread Go run narrows the throughput gap substantially. In this sample, go-nsq still had higher end-to-end throughput, while asyncnsq had much lower p95 end-to-end latency. For multi-core Python consumption, use --consumer-processes so each process has its own event loop and NSQ can distribute messages across processes.

Profiles:

Profile Use case Messages Payload Concurrency Batch
quick fast smoke benchmark 5,000 256 B 64 100
pr balanced local benchmark 50,000 512 B 256 250
stress local high-throughput soak 250,000 1,024 B 512 500

Useful overrides:

uv run asyncnsq-benchmark \
  --profile pr \
  --messages 100000 \
  --payload-size 1024 \
  --concurrency 512 \
  --batch-size 500 \
  --markdown asyncnsq-benchmark.md \
  --json asyncnsq-benchmark.json

Benchmark numbers are most useful when compared on the same machine, Python version, NSQ version, Docker setup, and payload size. Every benchmark scenario reports an Errors column; a healthy run should keep it at zero.

License

The asyncnsq is offered under MIT license.

Donation

If you like this repo, buy me a coffee.

ETH wallet

drawing

Or you can participate with this project.