项目作者: amor71

项目描述 :
Monitored Multiprocessing Queues
高级语言: Python
项目地址: git://github.com/amor71/mnqueues.git
创建时间: 2021-07-03T07:43:55Z

开源协议:MIT License


Python 3

What are mnqueues?

mnqueues stands for Monitored Queues - a coupling between Python multiprocessing Queue() and a Monitor entity. A Monitor collect and alerts on Queue usage statistics.

Tracked measures

mnqueues tracks several measures per queue:

  1. Average number of writes to queue per minute,
  2. Average number of reads to queue per minute,
  3. Time spend in queue (latency) in milliseconds.


To install mnqueues type:

pip install mnqueues

How-To use

To create a monitored queue:

  1. import mnqueues as mnq
  2. from mnqueues.gcp_monitor import GCPMonitor
  3. q = mnq.MNQueue(monitor=GCPMonitor("some-unique-name"))

The MNQueue() object encapsulated Python multiprocessing.Queue() and supports same functions. The MNQueue() object can be passed between processes, like a Queue() object.


File Logger

  1. from mnqueues.log_monitor import LOGMonitor
  2. monitor = LOGMonitor("log-file-name")

Log all put() and get() calls to a log file with the the name log-file-name.log with the following format:

  1. [<OS process-id>]->2021-07-07 21:31:14 INFO:get counter: 5003
  2. [<OS process-id>]->2021-07-07 21:31:14 INFO:get counter: 4997

Google Cloud Monitor (using StackDriver)

  1. from mnqueues.gcp_monitor import GCPMonitor
  2. monitor = GCPMonitor("unique-name")

All calls to put() and get() are sent to Google Cloud Monitor. The Monitor class sends data to two custom measures:

  1. OpenCensus/mnqueues.{name}.number_queue_get (line, no aggregation on GCP required)
  2. OpenCensus/mnqueues.{name}.number_queue_put (line, no aggregation on GCP required)
  3. OpenCensus/mnqueues.{name}.time_in_queue_distribution (heat-map with sum, shows latency distribution)

Note that {name} is passed as a parameter when constructing the Monitor and it aims to assist in creating dash-boards for specific use-cases.

See for details.



  1. View Google Cloud (GCP) Monitoring dashboard showing queue.put() and queue.get() rates per second, generated by running pytest on the project tests folder.
  2. Monitoring queues with real-time web-socket trading data for LiuAlgoTrader.

Further examples

Can be found in the tests folder.


Contributions are highly appreciated. Please review our
Code of Conduct. Bug reports & feature requests can be left in the Issues section, or email me at amor71@sgeltd.com