[![Dependencies] (https://david-dm.org/pgte/skiff-algorithm.png)](https://david-dm.org/pgte/skiff-algorithm) ![Gitter](https://badges.gitter.im/Join Chat.svg)
Abstract Node.js implementation of the Raft Consensus Algorithm.
If you're looking for a directly usable module, take a look at skiff (on top of LevelDB + Msgpack).
Contents:
$ node install skiff --save
var Node = require('skiff');
var node = Node();
or, with options:
var options = {
// ...
};
var node = Node(options);
id
: id of the node. if not defined, it's self assigned. accessible on node.id
standby
: if true, will start at the standby
state instead of the follower
state. In the standby
state the node only waits for a leader to send commands. Defaults to false
.cluster
: the id of the cluster this node will be a part oftransport
: the transport to communicate with peers. See the transport API
persistence
: the node persistence layer. See the persistence API
uuid
: function that generates a UUID. Defaults to using the cuid
package.heartbeatInterval
: the interval between heartbeats sent from leader. defaults to 50 ms.minElectionTimeout
: the minimum election timeout. defaults to 150 ms.maxElectionTimeout
: the maximum election timeout. defaults to 300 ms.commandTimeout
: the maximum amount of time you're willing to wait for a command to propagate. Defaults to 3 seconds. You can override this in each command call.retainedLogEntries
: the maximum number of log entries that are committed to the state machine that should remain in memory. Defaults to 50.metadata
: to be used by plugins if necessaryMakes the peer listen for peer communications. Takes the following arguments:
options
- connection options, depends on the transport provider being used.listener
- a function with the following signature: function (peerId, connection)
. The arguments for the listener function are:
peerId
- the identification of the peerconnection
- a connection with the peer, an object implementing the Connection API (see below).Joins a peer into the cluster.
node.join(peer, cb);
The peer is a string describing the peer. The description depends on the transport you're using.
Removes a peer from the cluster,
node.leave(peer, cb);
The peer is a string describing the peer. The description depends on the transport you're using.
Appends a command to the leader log. If node is not the leader, callback gets invoked with an error. Example:
node.command('some command', function(err) {
if (err) {
if (err.code == 'ENOTLEADER') {
// redirect client to err.leader
}
} else {
console.log('cluster agreed on this command');
}
});
This command times out after the options.commandTimeout
passes by, but you can override this by passing in some options:
node.command('some command', {timeout: 5000}, function(err) {
if (err) {
if (err.code == 'ENOTLEADER') {
// redirect client to err.leader
}
} else {
console.log('cluster agreed on this command');
}
});
Command options are:
timeout
: maximum time waiting to replicate to a majority. Defaults to node options.commandTimeout
, which defaults to to 3000 (3 seconds).waitForNode
: node id to wait to commit to. This may be useful to enforce read-your-writes on proxying clients. Defaults to undefined
.Returns the peer metadata if the peer is known.
A node emits the following events that may or not be interesting to you:
error(error)
- when an unexpected error occurs.state(stateName)
- when a new state transition occurs. Possible values for stateName
are: idle
, follower
, candidate
, leader
.loaded()
- when a node has loaded configuration from persistence provider.election timeout()
- when an election timeout occurs.applied log(logIndex)
- when a node applies a log entry to the state machineSkiff if failry high-level and doesn't implement the network transport or the persistence layers. Instead, you have to provide an implementation for these.
The node transport
option accepts a provider object that implements the following interface:
connect(localNodeId, options)
— for connecting to the peer. returns a connection object. The localNodeId
argument contains the local node id.listen(localNodeId, options, fn)
— for listening to incoming connection requests. The fn
argument is a function with the signaure function (peerId, connection)
that gets invoked when there is a connection request, passing in a connection object that implements the Connection API (see below). The localNodeId
argument contains the local node id.The connection API implements the following interface:
send(type, arguments, callback)
— for making a remote call into the peer. The callback
argument is a function with the signature function (err, result)
.receive(fn)
— listen for messages from the remote peer. The fn
argument is a function with the signature function (type, args, cb)
. cb
is a function that accepts the reply arguments.close(callback)
— for closing the connection. The callback
argument is a function with the signature function (err)
.The connection object is an EventEmitter, emitting the following events:
close
- once the connection closesThe node persistence
option accepts a provider object that implements the following interface:
saveMeta(nodeId, state, callback)
— saves the raft engine metadata. nodeId
is a string that represents the current node. state
is an arbitrary object (hash map) and callback
is a function with the signature function callback(err)
;loadMeta(nodeId, callback)
— loads the engine metadata state. callback
is a function with the signature function callback(err, state)
;applyCommand(nodeId, commitIndex, command, callback)
- applies a command to the node state machine.
commitIndex
and the log application to the state machine should be successful or fail entirely.callback
is a function with the following signature: function callback(err)
.lastAppliedCommitIndex(nodeId, callback)
- returns the last commitIndex
that was successfully applied to the node state machine.
callback
is a function invoked once the result is readycallback
is a function with the following signature: function(err, commitIndex)
- if operation resulted in error, err
contains an error object. Otherwise, commitIndex
may contain an integer with the index of the latest applied commitIndex
if there was one.saveCommitIndex(nodeId, commitIndex, callback)
- saves only the commit indexcreateReadStream(nodeId)
- returns a read stream that streams all the state machine data.createWriteStream(nodeId)
- resets the state machine and returns a write stream to overwrite all the state machine data.removeAllState(nodeId, callback)
- remove all state for the given nodeSetting up a Skiff cluster can be kind of tricky. To avoid partitions you will need to start with a node that will become leader and then add the followers in the standby mode. Mind you that you can only send join
commands to a leader node (to avoid partitions — it's all explained in detail in the Raft paper). Once this is done and persisted you should never need to do this again since the nodes will know each other and elect a leader at random if leader goes down.
So typically the bootstrap code for the leader would be something like:
var Node = require('skiff');
var leader = Node({
transport: transport,
persistence: persistence
});
leader.listen(address);
/// wait for the leader node to actually become a leader of it's one node
leader.once('leader', function() {
leader.join('node1');
leader.join('node2');
});
leader.on('joined', function(peer) {
console.log('leader joined %s', peer.id);
});
The follower bootstrapping code would look something like this:
var Node = require('skiff');
var node = Node({
transport: transport,
persistence: persistence,
standby: true // important
});
node.listen(address);
This makes the follower start in the standby mode.
As mentioned, once the cluster enters stationary mode you just need to bootstrap all the nodes in the same way:
var Node = require('skiff');
var node = Node({
transport: transport,
persistence: persistence,
});
node.listen(address);
ISC
© Pedro Teixeira