timeq
A file-based priority queue in Go.
Generally speaking, timeq
can be used to implement these and more:
mmap()
Push()
and Read()
and only few other functions.This implementation should be generally useful, despite the time
in the
name. However, the initial design had timestamps as priority keys in mind. For
best performance the following assumptions were made:
mmap()
and mremap()
(i.e. Linux/FreeBSD)If some of those assumptions do not fit your use case and you still managed to make it work, I would be happy for some feedback or even pull requests to improve the general usability.
See the API documentation here for examples and the actual documentation.
My primary use case was a embedded Linux device that has different services that generate a stream of data that needs to be send to the cloud. For this the data was required to be in ascending order (sorted by time) and also needed to be buffered with tight memory boundaries.
A previous attempt based onsqlite3
did work kinda well but was much slower
than it had to be (partly also due to the heavy cost of cgo
). This motivated me to
write this queue implementation.
To download the library, just do this in your project:
# Use latest or a specific tag as you like
$ go get github.com/sahib/timeq@latest
We also ship a rudimentary command-line client that can be used for experiments. You can install it like this:
$ go install github.com/sahib/timeq/cmd@latest
The included benchmark pushes 2000 items with a payload of 40 byte per operation.
$ make bench
goos: linux
goarch: amd64
pkg: github.com/sahib/timeq
cpu: 12th Gen Intel(R) Core(TM) i7-1270P
BenchmarkPopSyncNone-16 35924 33738 ns/op 240 B/op 5 allocs/op
BenchmarkPopSyncData-16 35286 33938 ns/op 240 B/op 5 allocs/op
BenchmarkPopSyncIndex-16 34030 34003 ns/op 240 B/op 5 allocs/op
BenchmarkPopSyncFull-16 35170 33592 ns/op 240 B/op 5 allocs/op
BenchmarkPushSyncNone-16 20336 56867 ns/op 72 B/op 2 allocs/op
BenchmarkPushSyncData-16 20630 58613 ns/op 72 B/op 2 allocs/op
BenchmarkPushSyncIndex-16 20684 58782 ns/op 72 B/op 2 allocs/op
BenchmarkPushSyncFull-16 19994 59491 ns/op 72 B/op 2 allocs/op
timeq
supports a Fork()
operation that splits the consuming end of a queue
in two halves. You can then consume from each of the halves individually,
without modifying the state of the other one. It's even possible to fork a fork
again, resulting in a consumer hierarchy. This is probably best explained by
this diagram:
Fork("foo")
.fork.Pop()
.This is implemented efficiently (see below) by just having duplicated indexes. It opens up some interesting use cases:
timeq
, each Pop()
'ingmax-age
of queue's items.BucketSplitConf
«).Since the index is quite small (only one entry per batch) we can easily fit it in memory. On the initial load all bucket indexes are loaded, but no memory is mapped yet.
BucketSplitConf
it cannot be changed later.timeq
will error out in this case and the queue needs to be migrated.The data is stored on disk in two files per bucket:
data.log
: Stores a single entry of a batch.idx.log
: Stores the key and location of batches. Can be regenerated from dat.log
.This graphic shows one entry of each:
Each bucket lives in its own directory called K<key>
.
Example: If you have two buckets, your data looks like this on this:
/path/to/db/
├── split.conf
├── K00000000000000000001
│ ├── dat.log
│ ├── idx.log
│ └── forkx.idx.log
└── K00000000000000000002
├── dat.log
├── idx.log
└── forkx.idx.log
The actual data is in dat.log
. This is an append-only log that is
memory-mapped by timeq
. All files that end with idx.log
are indexes, that
point to the currently reachable parts of dat.log
. Each entry in idx.log
is
a batch, so the log will only increase marginally if your batches are big
enough. forkx.idx.log
(and possibly more files like that) are index forks,
which work the same way as idx.log
, but track a different state of the respective bucket.
NOTE: Buckets get cleaned up on open or when completely empty (i.e. all forks are empty) during consumption. Do not expect that the disk usage automatically decreases whenever you pop something. It does decrease, but in batches.
dat.log
and idx.log
are append-only, requiring no random seeking for best performance.dat.log
is memory mapped and resized using mremap()
in big batches. The bigger the log, the bigger the pre-allocation.Push()
uses binary search for fast sorting.Shovel()
can move whole bucket directories, if possible.There are no notable places where the key of an item is actually assumed to be
timestamp, except for the default BucketSplitConf
(which can be configured). If you
find a good way to sort your data into buckets you should be good to go. Keep
in mind that timestamps were the idea behind the original design, so your
mileage may vary - always benchmark your individual usecase. You can modify one
of the existing benchmarks to test your assumptions.
Most importantly: Only buckets are loaded which are being in use. This allows a very small footprint, especially if the push input is already roughly sorted.
There are also some other reasons:
Shovel()
we can cheaply move buckets if they do not exist in the destination.It depends on a few things. Answer the following questions in a worst case scenario:
As timeq
uses mmap(2)
internally, only the pages that were accessed are
actually mapped to physical memory. However when pushing a lot of data this is
mapped to physical memory, as all accessed pages of a bucket stay open (which is
good if you Pop immediately after). So you should be fine if this evaluates to true:
BytesPerItem * ItemsPerBucketInWorstCase * MaxOpenParallelBuckets < BytesMemoryAvailable - WiggleRoom
.
You can lower the number of open buckets with MaxOpenParallelBuckets
.
Keep in mind that timeq
is fast and can be memory-efficient if used correctly,
but it's not a magic device. In future I might introduce a feature that does not
keep the full bucket mapped if it's only being pushed to. The return-on-invest
for such an optimization would be rather small though.
Yes, no problem. The index may store more than one batch per key. There is a
slight allocation overhead on Queue.Push()
though. Since timeq
was
mostly optimized for mostly-unique keys (i.e. timestamps) you might see better
performance with less duplicates. It should not be very significant though.
If you want to use priority keys that are in a very narrow range (thus many
duplicates) then you can think about spreading the range a bit wider.
For example: You have priority keys from zero to ten for the tasks in your job
queue. Instead of using zero to ten as keys, you can add the job-id to the key
and shift the priority: (prio << 32) | jobID
.
timeq
?I use it on a big fleet of embedded devices in the field at GermanBionic, so it's already quite a bit battle tested. Design wise, damaged index files can be regenerated from the data log. There's no error correction code applied in the data log and no checksums are currently written. If you need this, I'm happy if a PR comes in that enables it optionally.
For durability, the design is build to survive crashes without data loss (Push, Read) but, in some cases, it might result in duplicated data (Shovel). My recommendation is designing your application logic in a way that allows duplicate items to be handled gracefully.
This assumes a filesystem with full journaling (data=journal
for ext4) or
some other filesystem that gives your similar guarantees. We do properly call
msync()
and fsync()
in the relevant cases. For now, crash safety was not
yet tested a lot though. Help here is welcome.
The test suite is currently roughly as big as the codebase. The best protection against bugs is a small code base, so that's not too impressive yet. We're of course working on improving the testsuite, which is a never ending task. Additionally we have a bunch of benchmarks and fuzzing tests.
timeq
safely usable from several go-routines?Yes. There is no real speed benefit from doing so though currently, as the current locking strategy prohibits parallel pushes and reads. Future releases might improve on this.
Source code is available under the MIT License.
Chris Pahl @sahib