worker pool for meteor using node js native `cluster` module
Meteor Package enabling users to create a Worker Pool on the server to handle heavy jobs.
It can run synchronous and asynchronous tasks from a persitent / in-memory queue.
It can also run recurring and scheduled tasks.
TaskQueue
is both a Mongodb and an in-memory backed job queue.
It enables to add, update, remove jobs consistently between processes.
You can attach event listeners to handle the tasks results / errors
TaskQueue.addTask({ taskType: String, data: Object, priority: Integer, _id: String, dueDate: Date, inMemory: Boolean })
taskType
is mandatorydata
is mandatory but you can pass an empty objectpriority
is mandatory, default is set to 1_id
is optionaldueDate
is mandatory, default is set to new Date()
inMemory
is optional, default is set to false
TaskQueue.addEventListener(eventType: String, callback: function)
eventType
is one of [ 'done', 'error' ]
callback
is a function prototyped as callback({ value: Any, task: Task })
, value
contains the result / error.
TaskQueue.removeEventListener(eventType: String)
eventType
is one of [ 'done', 'error' ]
note : you can only attach one event listener by eventType.
TaskQueue.inMemory.findById(_id: String)
TaskQueue.inMemory.removeById(_id: String)
TaskQueue.inMemory.tasks()
: returns all in-memory tasks
TaskQueue.inMemory.availableTasks()
: returns available in-memory tasks
Both in-memory and persistent tasks are available at the same time, and can be used altogether but :
priority
are greater. Cluster
is an isomorphic class to handle both the Worker and the Master
on the Master it :
closes the workers when no jobs are available(behavior can be overriden with keepAlive
)
on the Worker it :
constructor(taskMap: Object, { port: Integer, maxAvailableWorkers: Integer, refreshRate: Integer, inMemoryOnly: Boolean, messageBroker: function, logs: String, keepAlive: String | Integer, autoInitialize: Boolean })
taskMap
: a map of functions associated to a taskType
maxAvailableWorkers
: maximum number of child process (cores), default is set to system maximumport
: server port for child process servers, default set to 3008
refreshRate
: Worker pool refresh rate (in ms), default set to 1000
inMemoryOnly
: force the cluster to only pull jobs from the in-memory task queue.messageBroker
is optional, default set to null (see IPC section)logs
: is one of ['all', 'error']
, default sets to all
: if set to 'error'
, will only show the errors and warning logs.keepAlive
: an optional parameter that can be set to either:
'always'
to have the system start up the maxAvailableWorkers
number of workers immediately and keep them all alive alwaysInteger
value will have the system not shutdown workers until the number passed in milliseconds has passed since last a job was available to be picked up by a workerNOTE: default behavior when keepAlive
is not set is to only keep alive workers when there are jobs available to be picked up by them.
autoInitialize
: an optional parameter that controls whether the system will automatically initialize the cluster’s polling for jobs when the MasterCluster
is created. Default is set to true
.
Cluster.isMaster()
: true
if this process is the master
Cluster.maxWorkers()
: returns the maximum number of workers available at the same time
setRefreshRate(refreshRate: Integer)
: change the refresh rate on the master
if the Master process crashes or restarts, all the unfinished jobs will be restarted from the beginning.
Each job is logged when started / finished with the format : ${timestamp}
cluster:${taskType}:${taskId}
Introduced in version 2.0.0, you can communicate between the child processes and the Master.
To do so, you must provide the Master Cluster instance with a messageBroker
function.
this function will handle (on the master) all custom messages from the child processes.
the function should be prototype as follow :messageBroker(respond: function, msg: { status: Int > 1, data: Any })
respond
enables you to answer to a message from a childAll communications between the master and a child must be started by the child.
To do so you can use the second parameter passed in all functions provided to the taskMap toggleIPC
which is prototyped as follow :
toggleIPC(messageBroker: function, initalize: function): Promise
messageBroker
is prototyped as messageBroker(msg: Any)
initialize
is prototyped as initialize(sendMessageToMaster: function)
because toggleIPC
returns a promise you must return it (recursively), otherwise the job will be considered done, and the worker Idle.
Not returning it will result in unwanted, non expected behavior.
You should not use the default maxAvailableWorkers
(cpus allocation number) value.
The default value is set to your system cpus number, but it’s a reference value.
It’s up to you to understand your needs and allocate cpus accordingly.
for example, if you’re running on a 8 core machine :
so you should have maxAvailableWorkers = Cluster.maxWorkers() - 4 === 4
You can’t allocate more than your maximum system cpu number.
You still can outrange the theoretical maximum process number :
in such case your overall system should be slowed down because some of the processes execution will be deferred.
It will drastically reduce the multi-core performance gain.
import { Meteor } from 'meteor/meteor'
import { Cluster, TaskQueue } from 'meteor/nschwarz:cluster'
const taskMap = {
'TEST': job => console.log(`testing ${job._id} at position ${job.data.position}`),
'SYNC': (job) => console.log("this is a synchronous task"),
'ASYNC': (job) => new Promise((resolve, reject) => Meteor.setTimeout(() => {
console.log("this is an asynchronous task")
resolve()
}, job.data.timeout))
}
function onJobsDone({ value, task }) {
console.log('do something with the result')
}
function onJobsError({ value, task }) {
console.log('do something with the error')
}
function syncTask() {
return TaskQueue.addTask({ taskType: 'SYNC', data: {}})
}
function asyncTask() {
return TaskQueue.addTask({ taskType: 'ASYNC', data: { timeout: 5000 }, priority: 6 })
}
function inMemoryTask(priority, position) {
return TaskQueue.addTask({ taskType: 'TEST', priority, data: { position }, inMemory: true })
}
function persistentTask(priority, position) {
return TaskQueue.addTask({ taskType: 'TEST', priority, data: { position }, inMemory: false })
}
const cluster = new Cluster(taskMap)
Meteor.startup(() => {
if (Cluster.isMaster()) {
TaskQueue.addEventListener('done', onJobsDone)
TaskQueue.addEventListener('error', onJobsError)
syncTask()
asyncTask()
inMemoryTask(8, 1)
inMemoryTask(1, 2)
persistentTask(8, 1)
persistentTask(1, 2)
}
})
import { add } from 'date-fns/date' // external library to handle date objects
const dueDate = add(new Date(), { minutes: 10 })
TaskQueue.addTask({ taskType: 'sometype', priority: 1, data: {}, dueDate })
import { add } from 'date-fns/date' // external library to handle date objects
function recurringTask(job) {
// do something
const dueDate = add(new Date(), { minutes: 10 })
TaskQueue.addTask({ taskType: 'recurringTask', priority: 1, data: {}, dueDate })
}
const taskMap = {
recurringTask
}
function ipcPingTest(job, toggleIPC) {
return toggleIPC(
(msg) => {
console.log(msg)
return 'result you eventually want to pass to the master'
}, (smtm) => smtm({ status: 4, data: 'ping' })
)
}
const taskMap = {
ipcPingTest
}
function messageBroker(respond, msg) {
if (msg.data === 'ping') {
respond('pong')
}
}
const cluster = new Cluster(taskMap, { messageBroker })
function ipcPingTest(job, toggleIPC) {
return toggleIPC(
(msg) => {
console.log(msg)
return toggleIPC(
(msg) => console.log(msg),
(smtm) => smtm({ status: 4, data: 'ping' })
)
}, (smtm) => smtm({ status: 4, data: 'ping' }))
}
const taskMap = {
ipcPingTest
}
function messageBroker(respond, msg) {
if (msg.data === 'ping') {
respond('pong')
}
}
const cluster = new Cluster(taskMap, { messageBroker })
Because the worker will only work on tasks, you should remove the unnecessary imports to avoid resources consumption and longer startup time.
As a good practice you should put all your Master imports logic in the same file, and import it only on the master.
What I mean by “Master imports Logic” is :
It could be summarized as such :
// in your entry file
if (Cluster.isMaster()) {
import './MasterImports.js'
}
// ...rest of your cluster logic
Because recurring tasks are created “recursively”, there will always be a task in the queue.
If the server is restarted, it will start the recurring task because it’s still in the queue.
Be sure to remove all recurring task on the master before starting others, or secure the insert.
Otherwise you will have multiple identical recurring tasks running at the same time.
You can either do :
Meteor.startup(() => {
if (Cluster.isMaster()) {
TaskQueue.remove({ taskType: 'recurringTask' })
}
})
or at task initialization :
const recurringTaskExists = TaskQueue.findOne({ taskType: 'recurringTask' }) !== undefined
if (!recurringTaskExists) {
TaskQueue.addtask({ taskType: 'recurringTask', priority: 1, data: {}, dueDate })
}
If you want to be sure to have unique tasks, you should set a unique Id with TaskQueue.addTask
.
A good model could be : ${taskType}${associated_Model_ID}
There’s no way right now to know from which app the task is started (may change later) :
you should only run the Cluster on one of the app to avoid other apps to run a task which is not included in its taskMap.
You can still use the TaskQueue in all the apps of course.
If your apps have different domain names / configurations (for the mailer for example), you should pass these through the data
field.
For example if you’re using Meteor.absoluteUrl
or such in a task it will have the value associated with the app running the Cluster.