worker threads for javascript
OTHER License
A worker_threads wrapper for node.js. Provides transparent fallback for
pre-11.7.0 node.js (via child_process
) as well as browser web workers.
Browserifiable, webpack-able.
const threads = require('bthreads');
if (threads.isMainThread) {
const worker = new threads.Worker(__filename, {
workerData: 'foo'
});
worker.on('message', console.log);
worker.on('error', console.error);
worker.on('exit', (code) => {
if (code !== 0)
console.error(`Worker stopped with exit code ${code}.`);
});
} else {
threads.parentPort.postMessage(threads.workerData + 'bar');
}
Output (with node@<11.7.0):
$ node --experimental-worker threads.js
foobar
$ node threads.js
foobar
bthreads has 4 backends and a few layers of fallback:
worker_threads
- Uses the still experimental worker_threads module in--experimental-worker
ischild_process
- Leverages the child_process module in node.js to emulateweb_workers
- Web Workers API (browser only).polyfill
- A polyfill for the web workers API.The current backend is exposed as threads.backend
. Note that the current
backend can be set with the BTHREADS_BACKEND
environment variable.
require('bthreads')
will automatically pick the backend depending on what is
stable, but in some cases that may not be what you want. Because of this,
there are also more explicit entry points:
require('bthreads/process')
- Always the child_process
backend,require('bthreads/threads')
- Always the worker_threads
backend,Some caveats for the child_process
backend:
options.workerData
probably has a limited size depending on platform (theSharedArrayBuffer
does not work and will throw an error if sent.ref()
and unref()
are noops).Proxy
s can be serialized and cloned as theySHARE_ENV
does not work and will throw an error if passed.Caveats for the web_workers
backend:
options.workerData
possibly has a limited size depending on the browseroptions.name
).options.eval
requires a "bootstrap" file for code. This is essentially arequire('path')
works, for example), as well as bthreads itself. Byblob:
and/or data:
set as a Content-Security-Policyrequire
will have limited usability (restricted to only corebthreads
itself).close
event for MessagePorts only has partial support (if a threadclose
will not be emitted for any remote ports).close
event is not yet a part of the standard WebSHARE_ENV
does not work and will throw an error if passed.workerData
is serialized as json instead of using the structured cloneworkerData
. This was done tostdio
, stdin
, and stdout
options will throw an error if passed.Buffer
object in the browser, youbthreads.Buffer
like so: bthreads.Buffer = Buffer;
. OnceCaveats for the polyfill
backend:
importScripts
will perform a synchronous XMLHttpRequest
and potentiallyimportScripts
is not.connect-src
Content-Security-Policy
directiveworker-src
directive).ArrayBuffer
s behave as if they were SharedArrayBuffer
serror
events on workerProxy
s can be serialized and cloned as they cannot beCaveats for all of the above:
worker.postMessage(Buffer.prototype)
).Finally, caveats for the worker_threads
backend:
worker_threads
is still experimental in node.js!The low-level node.js API is not very useful on its own. bthreads optionally provides an API similar to bsock.
Example (for brevity, the async wrapper is not included below):
const threads = require('bthreads');
if (threads.isMainThread) {
const thread = new threads.Thread(__filename);
thread.bind('event', (x, y) => {
console.log(x + y);
});
console.log(await thread.call('job', ['hello']));
} else {
const {parent} = threads;
parent.hook('job', async (arg) => {
return arg + ' world';
});
parent.fire('event', ['foo', 'bar']);
}
Output:
foobar
hello world
You may find yourself wanting to parallelize the same worker jobs. The
high-level API offers a thread pool object (threads.Pool
) which will
automatically load balance and scale to the number of CPU cores.
if (threads.isMainThread) {
const pool = new threads.Pool(__filename);
const results = await Promise.all([
pool.call('job1'), // Runs on thread 1.
pool.call('job2'), // Runs on thread 2.
pool.call('job3') // Runs on thread 3.
]);
console.log(results);
} else {
const {parent} = threads;
Buffer.poolSize = 1; // Make buffers easily transferrable.
parent.hook('job1', async () => {
const buf = Buffer.from('job1 result');
return [buf, [buf.buffer]]; // Transfer the array buffer.
});
parent.hook('job2', async () => {
return 'job2 result';
});
parent.hook('job3', async () => {
return 'job3 result';
});
}
One of the remarkable features of bthreads is that it allows for static
analysis when bundling. The threads.Pool
and threads.Thread
objects resolve
their filename
argument as if it was a require()
from the calling file.
const thread = new threads.Thread('./worker.js');
The above line will resolve to ${__dirname}/worker.js
in node.js and
${window.location}/worker.js
in the browser. In node.js, it is not relative
to the current working directory! We accomplish this through various forms of
sorcery.
Why does this matter? Because it allows for browserify and/or webpack to do static analysis on your code and ship your code (including workers) as a single bundled file! Of course, this would require an extra browserify or webpack plugin which adds some more initialization code for choosing the proper entry point.
Statically analyzing the line above, the compiler should replace
'./worker.js'
with 'bthreads-worker@[id]'
. When initializing the code,
bthreads
should be implicitly required. bthreads
will set an environment
variable called process.env.BTHREADS_WORKER_INLINE
which contains the [id]
you generated previously, allowing you to determine which function to run
inside the worker thread.
In other words, when the compiler comes across:
const thread = new threads.Thread('./worker.js');
./worker.js
should be included in the bundled and mapped to an ID (in our
case, we include it in the bundle with an ID of 1
).
Our line becomes:
const thread = new threads.Thread('bthreads-worker@1');
The bundle's main entry point should include some initialization code like:
requireBthreads();
if (process.env.BTHREADS_WORKER_INLINE)
requireWorker(process.env.BTHREADS_WORKER_INLINE);
else
requireMain();
In the browser, bthreads exposes a more useful version of importScripts
called threads.require
.
const threads = require('bthreads');
const _ = threads.require('https://unpkg.com/underscore/underscore.js');
This should work for any library exposed as UMD or CommonJS. Note that
threads.require
behaves more like require
in that it caches modules
by URL.
Note that if you are eval'ing some code inside a script you plan to bundle with
browserify or webpack, require
may get unintentionally transformed or
overridden. This generally happens when you are calling toString on a defined
function.
const threads = require('bthreads');
function myWorker() {
const threads = require('bthreads');
threads.parentPort.postMessage('foo');
}
const code = `(${myWorker})();`;
const worker = new threads.Worker(code, { eval: true });
The solution is to access module.require
instead of require
.
const threads = require('bthreads');
function myWorker() {
const threads = module.require('bthreads');
threads.parentPort.postMessage('foo');
}
const code = `(${myWorker})();`;
const worker = new threads.Worker(code, { eval: true });
threads.isMainThread
- See worker_threads documentation.threads.parentPort
- See worker_threads documentation (worker only).threads.threadId
- See worker_threads documentation.threads.workerData
- See worker_threads documentation (worker only).threads.MessagePort
- See worker_threads documentation.threads.MessageChannel
- See worker_threads documentation.threads.Worker
- See worker_threads documentation.threads.backend
- A string indicating the current backendworker_threads
, child_process
, web_workers
, or polyfill
).threads.browser
- true
if a browser backend is being used.threads.location
- The current module URL (cross-platformimport.meta.url
).threads.filename
- The current module filename (cross-platform__filename
).threads.dirname
- The current module dirname (cross-platform__dirname
).threads.require(location)
- importScripts()
wrapper (browser+workerthreads.resolve(location)
- Resolve a URL or path to a filename. This isthreads.require
calls internally.threads.exit(code)
- A reference to process.exit
.threads.cores
- Number of CPU cores available.threads.Buffer
- In the browser, this must be set to the Buffer
objectthreads.bufferify
- A boolean indicating whether to cast Uint8Arraysthreads.Thread
- Thread
Class (see below).threads.Port
- Port
Class (see below).threads.Channel
- Channel
Class (see below).threads.Pool
- Pool
Class (see below).threads.parent
- A reference to the parent Port
(worker only, seenew Socket()
- Not meant to be called directly.Socket#events
(read only) - A reference to the bind EventEmitter
.Socket#closed
(read only) - A boolean representing whether the socket isSocket#bind(name, handler)
- Bind remote event.Socket#unbind(name, handler)
- Unbind remote event.Socket#hook(name, handler)
- Add hook handler.Socket#unhook(name)
- Remove hook handler.Socket#send(msg, [transferList])
- Send message, will be emitted as amessage
event on the other side.Socket#read()
(async) - Wait for and read the next message
event.Socket#fire(name, args, [transferList])
- Fire bind event.Socket#call(name, args, [transferList], [timeout])
(async) - Call remoteSocket#hasRef()
- Test whether socket has reference.Socket#ref()
- Reference socket.Socket#unref()
- Clear socket reference.Socket@message(msg)
- Emitted on message received.Socket@error(err)
- Emitted on error.Socket@event(event, args)
- Emitted on bind event.new Thread(filename, [options])
- Instantiate thread with module.new Thread(code, [options])
- Instantiate thread with code.new Thread(function, [options])
- Instantiate thread with function.Thread#online
(read only) - A boolean representing whether the thread isThread#stdin
(read only) - A writable stream representing stdin (onlyoptions.stdin
was passed).Thread#stdout
(read only) - A readable stream representing stdout.Thread#stderr
(read only) - A readable stream representing stderr.Thread#threadId
(read only) - An integer representing the thread ID.Thread#open()
(async) - Wait for the online
event to be emitted.Thread#close()
(async) - Terminate the thread and wait for exit
eventasync
version of Thread#terminate
).Thread#wait()
(async) - Wait for thread to exit, but do not invokeclose()
. Also listen for errors and reject the promise if any occur.Thread@online()
- Emitted once thread is online.Thread@exit(code)
- Emitted on exit.new Port()
- Not meant to be called directly.Port#start()
- Open and bind port (usually automatic).Port#close()
(async) - Close the port and wait for close
event, butPort#wait()
(async) - Wait for port to exit, but do not invoke close()
.Port@close()
- Emitted on port close.new Channel()
- Instantiate channel.Channel#port1
(read only) - A Port
object.Channel#port2
(read only) - A Port
object.new Pool(filename, [options])
- Instantiate pool with module.new Pool(code, [options])
- Instantiate pool with code.new Pool(function, [options])
- Instantiate pool with function.Pool#file
(read only) - A reference to the filename, function, or codePool#options
(read only) - A reference to the options passed in.Pool#size
(read only) - Number of threads to spawn.Pool#events
(read only) - A reference to the bind EventEmitter
.Pool#threads
(read only) - A Set
containing all spawned threads.Pool#open()
(async) - Populate and wait until all threads are onlinePool#close()
(async) - Close all threads in pool, reject on errors.Pool#populate()
- Populate the pool with this.size
threads (otherwisePool#next()
- Return the next thread in queue (this may spawn a newPool#bind(name, handler)
- Bind remote event for all threads.Pool#unbind(name, handler)
- Unbind remote event for all threads.Pool#hook(name, handler)
- Add hook handler for all threads.Pool#unhook(name)
- Remove hook handler for all threads.Pool#send(msg)
- Send message to all threads, will be emitted as amessage
event on the other side (this will populate the pool with threadsPool#fire(name, args)
- Fire bind event to all threads (this willPool#call(name, args, [transferList], [timeout])
(async) - Call remotePool#hasRef()
- Test whether pool has reference.Pool#ref()
- Reference pool.Pool#unref()
- Clear pool reference.Pool@message(msg, thread)
- Emitted on message received.Pool@error(err, thread)
- Emitted on error.Pool@event(event, args, thread)
- Emitted on bind event.Pool@spawn(thread)
- Emitted immediately after thread is spawned.Pool@online(thread)
- Emitted once thread is online.Pool@exit(code, thread)
- Emitted on thread exit.The options
object accepted by the Thread
, Pool
, and Worker
classes is
nearly identical to the worker_threads worker options with some differences:
options.type
and options.credentials
are valid options when using theoptions.type = 'module'
willpolyfill
backend. If a file extension is .mjs
,options.type
is automatically set to module
for consistency with node.js.options.bootstrap
is a valid option in the browser when used in combinationoptions.eval
. Its value should be the URL of a compiled bundle file.false
to do a raw eval (you must inline your own initializationimportScripts
).Pool
class accepts size
option. This allows you to manually set theoptions.dirname
allows you to set the __dirname
of an eval'd module.require
more predictable in eval'd modules (note this is notThread
and Pool
objects -- it is done automatically).In the browser, workerData
is serialized as JSON instead of structured data.
To force usage of the structured clone algorithm, it's possible to require
./lib/encoding
(note that this will increase your code size greatly).
const encoding = require('bthreads/encoding');
const thread = new threads.Thread('./worker.js', {
workerData: encoding.stringify({ foo: 'bar' })
});
If you contribute code to this project, you are implicitly allowing your code
to be distributed under the MIT license. You are also implicitly verifying that
all code is your original work. </legalese>
See LICENSE for more info.