yet another distributed task queue using redis for node.js
MIT License
shutdown
and wait for the callback. Thisvar JobQueue = require('redis-dist-job-queue');
var jobQueue = new JobQueue();
jobQueue.registerTask("./foo_node_module");
jobQueue.start();
var options = {resourceId: 'resource_id', params: {theString: "derp"}};
jobQueue.submitJob('thisIsMyTaskId', options, function(err) {
if (err) throw err;
console.info("job submitted");
});
jobQueue.on('error', function(err) {
console.error("job queue error:", err.stack);
});
foo_node_module.js
:
module.exports = {
id: 'thisIsMyTaskId',
perform: function(params, callback) {
console.info(params.theString.toUpperCase());
callback();
},
};
options
:
namespace
- redis key namespace. Defaults to "redis-dist-job-queue."
queueId
- the ID of the queue on redis that we are connecting to."default"
.redisConfig
- defaults to an object with properties:
host
- 127.0.0.1port
- 6379db
- 0childProcessCount
- If set to 0 (the default), no child processes areworkerCount
- number of workers for this JobQueue instance, per childchildProcessCount
is positive, total concurrency for thischildProcessCount * workerCount
. If childProcessCount
is 0
,workerCount
. Defaults to the numberflushStaleTimeout
- every this many milliseconds, scan for jobs thatStarts the desired number of workers listening on the queue.
It's quite possible that you never want to call start()
. For example, the
case where you want a server to submit processing jobs, but not perform them.
You must register all the tasks you want to be able to perform before calling
start()
.
modulePath
is resolved just like a call to require
.
The node module at modulePath
must export these options:
id
- unique task ID that identifies what function to call to performperform(params, callback)
- function which actually does the task.
params
- the same params you gave to submitJob
, JSON encodedcallback(err)
- call it when you're done processing. Until you callAnd it may export these optional options:
timeout
- milliseconds since last heartbeat to wait before considering10000
.Adds a job to the queue to be processed by the next available worker.
taskId
- the id
field of a task you registered with registerTask
options
- object containing any of these properties:
resourceId
- a unique identifier for the resource you are about toparams
- an object which will get serialized to and from JSON and thenperform
function of a task.retries
- how many times to retry a job before moving it to the failedcallback(err)
- lets you know when the job made it onto the queueGracefully begins the shutdown process allowing any ongoing tasks to finish.
callback
is called once everything is completly shut down.
Moves all jobs from the failed queue to the pending queue.
Deletes all jobs from the failed queue.
Forces an immediate flushing of jobs that crashed while executing. Jobs of this
sort are put into the failed queue, and you can then retry or delete them.
You probably don't want to call forceFlushStaleJobs
manually. It's mostly for
testing purposes.