Queueing jobs in Postgres from Node.js like a boss
MIT License
Bot releases are visible (Hide)
includeMetadata
option to fetch()
, subscribe()
, and their derivatives to allow including all job attributes during fetch. PR from @kevbohdeleteQueue(name)
and deleteAllQueues()
behavior to only impact pending queue items and not delete completed or active jobs.getQueueSize(name)
to retrieve the current size of a queue.clearStorage()
as a utility function if and when needed to empty all job storage, archive included.start()
is now fully multi-master ready and supported for installation, schema migrations and maintenance operations.
Added default configurations. The following options can now be set in the constructor and will apply to all usages of publish()
or subscribe()
on the instance unless overridden on the functions themselves.
MAJOR: Replaced expiration pg interval string configuration in publish()
with specific integer options for better validation and api consistency. If the expireIn
option is detected after upgrading, you will see a warning such as the following, which will only be emitted once per instance. As mentioned above, all of these options can become defaults if used in the constructor configuration.
(node:1) [pg-boss-w01] Warning: 'expireIn' option detected. This option has been removed. Use expireInSeconds, expireInMinutes or expireInHours
expireIn
expireInSeconds
expireInMinutes
expireInHours
MAJOR: Added retention policies for created jobs. In v3, maintenance operations archived completed jobs, but this policy ignored jobs which were created and never fetched.
publish()
and new PgBoss()
retentionSeconds
retentionMinutes
retentionHours
retentionDays
MAJOR: Replaced maintenance pg interval string configurations with specific integer options for better validation and api consistency
deleteArchivedJobsEvery
archiveCompletedJobsEvery
archiveIntervalSeconds
archiveIntervalMinutes
archiveIntervalHours
archiveIntervalDays
deleteIntervalSeconds
deleteIntervalMinutes
deleteIntervalHours
deleteIntervalDays
MAJOR: Consolidated the maintenance constructor options and removed any options for intervals less than 1 second.
expireCheckInterval
expireCheckIntervalSeconds
expireCheckIntervalMinutes
archiveCheckInterval
archiveCheckIntervalSeconds
archiveCheckIntervalMinutes
deleteCheckInterval
maintenanceIntervalSeconds
maintenanceIntervalMinutes
MAJOR: Split static getMigrationPlans() function into 2 functions for clarity.
uninstall
argument from getMigrationPlans(schema, version)
getRollbackPlans(schema, version)
MAJOR: Removed pgcrypto from installation script.
The breaking changes introduced in this release should not cause any run-time failures, as they are focused on maintenance and operations. However, if you use the deferred publishing options, read the section below regarding retention policy changes, as this version will now archive jobs which have been created but never fetched.
This release was originally started to support rolling deployments where a new instance was being started before another instance was turned off in a container orchestration system. When this happened, sometimes a race condition occurred between maintenance operations causing unpredictable deadlock errors (see issue #133). This was primarily because of the use of unordered data sets in CTEs from a DELETE ... RETURNING
statement. However, instead of focusing on the SQL itself, the concurrency problem proved a far superior use case to resolve holistically, and this became a perfect example of pg-boss eating its own dog food via a dedicated maintenance queue (mentioned below).
The result of using a queue for maintenance instead of timers such as setTimeout()
is the same distributed concurrency benefit of using queues for other workloads. This is sometimes referred to as a multi-master configuration, where more than one instance is using start()
simultaneously. If and when this occurs in your environment, only one of them will be able to fetch a job (maintenance or state monitoring) and issue the related SQL commands.
Additionally, all schema operations, both first-time provisioning and migrations, are nested within advisory locks to prevent race conditions during start()
. Internally, these locks are created using pg_advisory_xact_lock()
which auto-unlock at the end of the transaction and don't require a persistent session or the need to issue an unlock. This should make it compatible with most connection poolers, such as pgBouncer in transactional pooling mode.
One example of how this is useful would be including start()
inside the bootstrapping of a pod in a ReplicaSet in Kubernetes. Being able to scale up your job processing using a container orchestration tool like k8s is becoming more popular, and pg-boss can be dropped into this system with no additional code or special configuration.
As mentioned above, previously only completed jobs were included in the archive maintenance, but with one exception: completion jobs were also moved to the archive even though they were in created
state. This would sometimes result in missed jobs if an onComplete
subscription were to reach a backlogged state that couldn't keep up with the configured archive interval.
A new set of retention options (listed above) have been added which control how long any job may exist in created state, original or completion. Currently, the default retention is 30 days, but even if it's customized it automatically carries over to the associated completion job as well.
Furthermore, this retention policy is aware of any deferred jobs, such as those created with publishAfter()
. If you have future-dated or interval-deferred jobs, the retention policy start date is internally based on the deferred date, not the created timestamp.
If you're upgrading from v3, a migration script will run and set the retention date on all jobs found in 'created' state. For example, if you use the option retentionDays: 7
in the constructor, then run start()
, the migration will assign a retention date of 7 days after the created or deferred date, whichever is later.
To keep maintenance overhead as light as possible, the concurrency of each task (expiration, archiving, deletion) has been adjusted to one operation at a time and placed into dedicated queues prefixed with '__pgboss__'
. The same was also done for the optional state count monitoring.
If you were running pg-boss as a superuser account in production to have it auto-provision the pgcrypto extension in a new database, this change might be viewed as a disadvantage. The primary principle at play in this decision is "It should be simple to uninstall anything which was installed". Adding an extension to a database cannot be scoped to a schema, and it requires superuser privilege. If pg-boss were to install pgcrypto, it would be unsafe to assume it could be later removed, as it may be in use elsewhere. Also, having a script embedded in the installation which requires superuser privilege sends the wrong message of the intent of how applications should be configured in production, where a least privilege model should always be used. As a reminder, below is a simple 1-liner to run in your database if it's not already installed. If you are upgrading pg-boss from a previous version, this is obviously not an issue.
CREATE EXTENSION pgcrypto;
archive()
, purge()
and expire()
to exports for manual housekeeping if desired along with connect(). Use this only if you need it for special cases, as it's not a good idea to run these in parallel (see deadlock comment above).Published by timgit about 3 years ago
teamConcurrency
to 1 when teamSize
> 1.Published by timgit about 3 years ago
Added wildcard pattern matching for subscriptions. The allows you to have 1 subscription over many queues. For example, the following subscription uses the *
placeholder to fetch completed jobs from all queues that start with the text sensor-report-
.
boss.onComplete('sensor-report-*', processSensorReport);
Wildcards may be placed anywhere in the queue name. The motivation for this feature is adding the capability for an orchestration to use a single subscription to listen to potentially thousands of job processors that just have 1 thing to do via isolated queues.
Multiple subscriptions to the same queue are now allowed on the same instance.
Previously an error was thrown when attempting to subscribe to the same queue more than once on the same instance. This was merely an internal concern with worker tracking. Since teamConcurrency
was introduced in 3.0, it blocks polling until the last job in a batch is completed, which may have the side effect of slowing down queue operations if one job is taking a long time to complete. Being able to have multiple subscriptions isn't necessarily something I'd advertise as a feature, but it's something easy I can offer until implementing a more elaborate producer consumer queue pattern that monitors its promises.
Remember to keep in mind that subscribe()
is intended to provide a nice abstraction over fetch()
and complete()
, which are always there if and when you require a use case that subscribe()
cannot provide.
Internal state job suffixes are now prefixes. The following shows a comparison of completed state jobs for the queue some-job
.
some-job__state__completed
__state__completed__some-job
This is a internal implementation detail included here if you happen to have any custom queries written against the job tables. The migration will handle this for the job table (the archive will remain as-is).
retryDelay
(int) and retryBackoff
(bool)retryBackoff
will use an exponential backoff algorithm with jitter to somewhat randomize the distribution. Inspired by Marc on the AWS blog post Exponential Backoff and Jitter
subscribe()
! If your callback returns a promise, it will defer polling and other callbacks until it resolves.
teamConcurrency
was added that can be used along with teamSize
for single job callbacks to control backpressure if a promise is returned.subscribe()
will now return an array of jobs all at once when batchSize
is specified.fetch()
now returns jobs with a convenience job.done()
function like subscribe()
onComplete()
job.data.failed
will be true.job.data.state
will be 'expired'
.const {states} = require('pg-boss');
if(job.data.state === states.expired) {
console.log(`job ${job.data.request.id} in queue ${job.data.request.name} expired`);
console.log(`createdOn: ${job.data.createdOn}`);
console.log(`startedOn: ${job.data.startedOn}`);
console.log(`expiredOn: ${job.data.completedOn}`);
console.log(`retryCount: ${job.data.retryCount}`);
}
onComplete()
. Previously, if you called complete or fail with an array of job IDs, no state jobs were created.publishThrottled(name, data, options, seconds, key)
publishDebounced(name, data, options, seconds, key)
publishAfter(name, data, options, seconds | ISO date string | Date)
publishOnce(name, data, options, key)
deleteQueue()
and deleteAllQueues()
api to clear queues if and when needed.failed
, expired-job
, and job
, as these were all instance-bound and pre-dated the distribution-friendly onComplete()
done()
argument in subscribe()
callback in favor of consolidating to job.done()
expired-count
event to expired
subscribe()
with a batchSize
property now runs the callback only once with an array of jobs. The teamSize
option still calls back once per job.onFail()
, offFail()
, onExpire()
, onExpire()
, fetchFailed()
and fetchExpired()
. All job completion subscriptions should now use onComplete()
and fetching is consolidated to fetchCompleted()
. In order to determine how the job completed, additional helpful properties have been added to data
on completed jobs, such as state
and failed
.startIn
option has been renamed to startAfter
to make its behavior more clear. Previously, this value accepted an integer for the number of seconds of delay, or a PostgreSQL interval string. The interval string has been replaced with an UTC ISO date time string (must end in Z), or you can pass a Date object.singletonDays
option has been removedarvhive
deleteArchivedJobsEvery
and deleteCheckInterval
settings added for defining job retention.monitor-states
event to add counts by queue, not just totals.