The only reason I can come up with would be because it's a read-to-use library you can just plug in which gives OK performance and some handy features because you can use the KV store for other things. But it doesn't scale well and backups with LevelDB are not really easy either (close DB, copy all files).
Message queues when they are ordered (at least on the local node/queue level) usually just need some kind of append-only log file. You don't do random reads or writes into the middle of the queue, you only modify the head and tail.
InfluxDB, albeit being a time series db has similar write patterns to a message queue, learned it the hard way when they first tried to use a LSM Tree database (LevelDB), then switched to a B+Tree (BoltDB/LMDB) but that also doesn't scale once the DB gets big and the tree has quite some depth. They kindly did a nice writeup of their journey: https://influxdb.com/docs/v0.9/concepts/storage_engine.html
Why not do it simple and use append-only files without complex structure and management?
Check out Kafka for a better storage format for message queues of this kind.
PS: every message queue should first clearly explain what guarantees it provides.
The R/W patterns for a message queue are simple:
- Messages are key/value
- key is an autoincrementing id
- Writes are at the end, Reads are from the beginning
- Once a message is processed, it's deleted
So in practice this means that the items are written in an append-only fashion, get merged in bigger chunks, and then get progressively deleted. So at higher levels you don't see the huge latencies due to compaction because all records are deleted. Knowing that keys are only incrementing could also lead to a simple optimization: the compaction phase can be a simple concatenation of files.
So you get an append-only system that progressively removes older entries as they are deleted without resorting to mad science hackery [1]. Why didn't it work for InfluxDB ? All I can guess is that individual entries for each series are all mixed together (InfluxDB wants to be able to manage many series with many tags) and older entries are not deleted as frantically, so you get the latencies we all know with compaction and unpredictable reads.
Now, this is purely theoretical and of course further experimentations are needed to make sure this is correct, but LSM is in my opinion a correct pattern here.
The InfluxDB experience is definitely illuminating. Their problems with LMDB were mainly due to misuse of the API. https://disqus.com/home/discussion/influxdb/benchmarking_lev...
For batched sequential writes, there is no other DB anywhere near as fast as LMDB http://symas.com/mdb/microbench/ (Section E, Batched Writes)
But even so - the reason LMDB can do this so quickly is because for batched sequential writes it cheats - it's just performing Appends, there's no complicated tree construction/balancing/splitting of any kind going on.
If you know that your workload will only be producer/consumer, with sequentially generated data that is sequentially consumed, it's a stupid waste of time to mess with any other structure than a pure linear queue. (Or a circular queue, when you know the upper bounds of how much data is outstanding.)
As for your initial statement - no, an LSM tree is not a correct pattern here. If your consumers are actually running as fast (or faster) than your producer then it should never flush from Level0/memory to Level1/disk. In that case all you've got is an in-memory queue that evaporates on a system crash.
If your consumers are running slower, that means data is accumulating in the DB, which means you will have compaction delays. And the compaction delays will only get slower over time, as more and more levels need to be merged. (Remember that merge operations are O(N). Then remember that there are N of them to do. O(N^2) is a horrible algorithmic complexity.) LSM is never a correct pattern.
Regarding provided guarantees, with simple 'get work_queue' reads it provides at-most-once delivery. With two phase reliable reads 'get work_queue/open', 'get work_queue/close' it provides at-least-once delivery (although message is kept in memory on server during a reliable read and will be lost if you SIGKILL siberite. On SIGTERM and SIGINT siberite will gracefully abort the read and save the message).
Indeed, either Siberite is a queue system which purpose is to dispatch each message to one and only one consumer for further processing and which requires the consumers to acknowledge fully processed messages ;
or Siberite is a journal system (in the spirit of Kafka) which purpose is to replay the full log to any consumer asking for it and which offers the consumer a watermark mechanism to keep track of their progress.
In the former case, the queue system is responsible of what to do in case of a missing or late acknowledgement (choosing between "at least once" or "at most once" message delivering). In the later case, the consumers are responsible of how to maintain an atomic view of message consumption and message processing (for instance using a transaction to persist an offset with a state).
Aside: There used to be a site sometime back which used to distribute compiled binaries of Go code for all platforms? Is it still up any chance?
http://nsq.io/overview/internals.html
The service you are thinking of might be https://github.com/ddollar/godist
You can see the download links on https://github.com/ddollar/forego use it.
But deletes are going to have a big impact still, and (working from my failing memory of LevelDB internals) I think might actually be the pathologically sad case.
But I found the idea of multiple consumer groups per queue very interesting. So basically you would still be able to fetch queue messages as you can do now and it will delete dequeued items, but you would also be able to use something like 'get queue_name:consumer_name' and it will create a consumer group internally with a stored offset and will serve messages using that offset. In case of reliable read failure each consumer group will keep it's own queue of failed deliveries, will check that queue and serve these failed items first. If source queue head has changed and became larger then consumer group offset, then consumer group offset would just start from the source queue head.
This way you can get Kafka-like multiple consumer groups per queue as an additional feature.
Each of these have trade-offs and the way it is architectured here, in the at-least-once case you will have to either remember all the processed messages or be prepared to process a message multiple times, whatever that means in your specific use-case.
So the queue keeps track of the high-watermark on a per consumer basis and all the consumer has to do is show up, tell the queue its deterministic name/id (might be driven by imaging, configuration, or SDN), and the queue will serve up the next new item that consumer hasn't seen yet.
This would be handy for really dynamic transient worker topologies because it keeps the mutable state and state tracking concerns entirely outside the transient worker.
That said, I still wouldn't use LevelDB. Unless I was expecting to do multi-attribute range queries or something (now we're well outside queue territory), but even then you're still folding over the data for knowable start/end markers and a linear scan over a binary term file will be faster than the multiple seeks + segment scans that LevelDB requires.
So it's either unreliable or slow.
Also if you have dynamic transient worker topologies, you have to remember those positions. You are saving data for later use, that may never arrive. How long do you keep this data?
Seems like a pretty messy way of doing things.
Completely agree about LevelDB.
But moving the concern to the consumer to track the cursor doesn't make the protocol any more stable. To keep a stable cursor, the consumer would need to persist that someplace, which just pushes the acknowledgement to that persistence component instead. If a stable cursor is what you're after, then co-locating it with the durable queue provides a simpler solution with a slightly better consistency guarantee.
The garbage collection problem is a real one, but realistically how many consumers is an infrastructure service like this going to have? Tens? Hundreds? Thousands? Millions? Billions?
No matter which one of those you pick it's a trivially small secondary index to maintain even if you never reaped it. I mean it's a K/V problem (consumer_id -> queue_offest) and there's a K/V store already sitting there. If you didn't want it to grow forever then you could establish a TTL policy via configuration.
The problem you would have is consumers that don't have stable or bounded id's. Like a system that assigns a new id every time the consumer makes a request or the consumer is restarted.
https://www.cs.berkeley.edu/~brewer/cs262/Aries.pdf
> Remember that merge operations are O(N). Then remember that there are N of them to do. O(N^2) is a horrible algorithmic complexity.
No. Mountains of actual math refute this. LSM-tree merges are O(N log N). This is an Actual Fact.
Read more, kids.
O(N log N) is still untenable in the long run, nobody has exponentially growing compute resources.
Also, N log(N) is nowhere near exponential. O(2^N) would be exponential, and that's not what you have here.
Stick about 10GB of small entries in it (should be enough to create all the levels) and then see what happens.
Also, you could reserve the persisted [H|T] for controlled shutdown scenarios. Basically anything that isn't complete system failure if you're properly trapping signals.
Calling send just copies the buffer to the kernel/driver. When the call returns you do not know how much of it is actually sent. You might have the situation of the producer thinking it was sent, when it in fact never actually made it onto the network.
The original LSM paper claims that writes to Level 0 are free because that's all in-memory. But that's not really true; if you have a stream of incoming writes then everything that goes into Level 0 must eventually be pushed out to Level 1. Buffering doesn't make writes free, it only displaces their occurrence in time.
So you have a rolling merge every M writes. As far as Big-O goes, that's N log(N) / M == N log(N) because Big-O disregards constant factors!
In the context of an implementation like LevelDB, theory and reality diverge even further. Since it's chunking data into 2MB files and deleting them during each merge operation, and also writing a bunch of bookkeeping into Manifest files and other stuff, the number of actual I/Os is much higher. A lot of wasted activity in allocating and deallocating space - filesystem metadata overhead that's also not transactionally safe.
In LevelDB a single merge reads 26MB and writes 26MB at a time to push 2MB of data from a level L to level L+1. So now instead of a single merge op costing only N, it actually costs 13*N. Again, if you're only talking about Big-O complexity you sweep this under the rug. But in reality, this is a huge cost.
Level 1 is only 10MB, so if it was empty it would fill in the first 1/2 second. For the remaining 1/2 second it would trigger 5 more compactions to Level 2, reading 130MB and writing 130MB. If it started out full then this would be 260MB/260MB respectively.
So for a 20MB/sec input workload you would need a disk subsystem capable of sustaining 498MB/sec of reads concurrent with 498MB/sec of writes. And that's only for a small DB, only Level 0-2 present (smaller than 110MB), and excluding the actual cost of filesystem operations (create/delete/etc).
That's only for the 1st second of load. For every second after that, you're dumping from Level 0 to Level 1 at 280MB read and 280MB write/sec. And dumping from Level 1 to Level 2 at 260/260 as before. 540/540 - so a disk capable of 1080MB/sec I/O is needed to sustain a 20MB/sec workload. And this is supposed to be HDD-optimized? Write-optimized? O(N logN) - what a laugh.
Maybe LSMs in general can be more efficient than this. LevelDB is pretty horrible though.