Streaming Databases in Real-Time with Kafka, Debezium, and MySQL(wecode.wepay.com) |
Streaming Databases in Real-Time with Kafka, Debezium, and MySQL(wecode.wepay.com) |
The article here as well as the yelp one seem to be interested in streaming out the MySQL database schema itself, not just data changes but schema changes as well. If you're doing this as part of bigger MySQL database replication scheme, e.g. that the stream is just consumed by other MySQL databases that agree to share the identical schema, that's great. But the context here seems to be more along the lines of a swarm of microservices, which we would expect are each dealing with their own private set of MySQL data, are not sending INSERT/UPDATE/DELETE statements to each other and instead are speaking more in terms of the domain model. My 90's-esque sense is that if you've built up on lots of services, the messages you want flying around the network should be in terms of the domain models of those services, not your raw SQL / DDL.
Disclaimer: this is an "I'm an idiot" comment because I think I'm missing something
Disclaimer: I'm the author of SQLAlchemy so yes, I get the SQL/DDL/ORM thing
But yeah, I might be missing something too, but by and large I've lived through a lot of code that depends on a specific database schema, rather than a domain model or some other abstraction. Even when database changes don't break things, there is so. much. regression. testing. before anyone is confident.
I think OO is particularly poor at modeling the kinds of problems I've been involved with - all our perspectives are probably path dependent. My domains have involved lots of data transfer, manipulation, ETL, and transmitting tuples from one language to another; the interface between different modules is at the data layer, not the API layer, because the business of our business is working with the customer's data. In so far as there are core objects, they're things like schemas and transformation rules - configuration, metadata, information about the data and its transformations.
I particularly like the way it's possible to express global constraints over sets of data with relational algebra in a declarative style. Databases aren't (usually) built to evaluate these kinds of constraints, and I don't want to build stored procedure APIs or triggers that emulate them, but the conceptual model is powerful, much more powerful and expressive to me than anything I see in OO.
Wirth once wrote: Algorithms + Data = Programs. I'd put Data first; and in fact when I want to understand a big program, it's almost always the data structures and data flow which reveal more, than the control flow.
It's unfortunate that, in common usage, "domain model" basically means "OO domain model". Relational model, one which properly reflects your domain, is most certainly as worthy of that name as an OO one.
We can, of course, debate if simply sharing relational database among multiple applications is good architecture. It very well might not be, due to missing intermediate layer that can absorb schema changes, provide caching, etc. Or due to crappiness of languages used to write stored procedures and databases generally not being great application servers (although PostgreSQL seems to be getting there...)
But it is not because relational model is not a "domain model".
Once we were very close to implement a very similar scenario (stream of events, some lightweight processing, kafka for distribution, hbase/solr for storage). If you go for such thing, better be sure that your business analysts are on the same page as you. Otherwise, instead of a clean event sourcing model, you may end up with some sort of message protocol that spans across the schema/data format/business logic/weird requirements. It is just that this thing will become your new "model".
From my reading of the flow chart, it appears that data jumps through 5 different data layers before it gets written in its final destination. This doesn't appear to be a transformation pipeline, that's just how much it takes to go from user input -> saved in indexed storage.
What unique featureset does WePay provide that justifies this type of complication?
As an industry, there's been an explosion of this massive overcomplication the last few years. We're contorting ourselves pretty bad here, and it seems primarily to be in service of the engineer's desire to, well, engineer. Can you imagine if we let road engineers get away with this kind of stuff?
Kafka Connect is architected around the idea of producers and consumers that either add messages to Kafka, or read messages from Kafka.
The MySQL producer isn't suited for anything other than the most basic of table replication, though; if you need any ETL, you'll be gluing more stuff into your pipeline downstream. And when the producer falls over, the first you'll know about it is when you read the logs or poll status indicators. It didn't give me warm fuzzies about reliability nor visibility nor flexibility. It was very basic stuff.
The Hadoop consumer had an unpleasant surprise: you have zero choice over table name in Hive metastore; your Kafka topic will be the name of your Hive table, no ifs, no buts, no choices. And since Kafka doesn't have any namespaces, either you're going to be running multiple Kafka clusters, or you need global agreement on topics vs Hive metadata (which does have namespaces). We have a multi-tenancy architecture and use namespaces. A non-starter.
Why do I think Kafka doesn't have the right feature set? Because Kafka message expiry has only two policies, as far as I could tell: time or space. Either your message is too old and gets discarded (en bloc, IIRC); or Kafka hits a space limit and starts clearing out old messages. The natural question that arises when you're using Kafka to buffer a consumer vs a producer, then, is flow control / backpressure: how do you get the producer to slow down when the consumer can't catch up? And vice versa? Well, there's knobs you can manually control to throttle the producer, but it's in your own hands. You're dancing at the edge of a cliff if a consumer has died and messages start expiring; there's nothing stopping data loss.
The only way you can start to turn this situation into a win is if (a) you have such a big firehose of data that nothing else can cope, or (b) you can take advantage of network effects and use Kafka as a data bus, not just as a pipe with exactly two ends. But it has to overcome the downsides.
Regarding the ETL, the thing I haven't figured out yet is what to do with the data that's not events. We do daily/hourly exports of those tables in-bulk, but it's not real-time in the data warehouse. This is mostly ok, but I'd love a magic bullet that let me stream these updates into BigQuery as well.
In any case -- nice blog post -- nice to see how others are doing it.
This is the first time I've heard of the approach WePay uses and I really like it. Now MySQL is the source of truth and Debezium pipes it into Kafka, allowing you to leverage Kafka while still using MySQL as your primary data store. (Or Postgres, which Debezium includes support for.)
At least how we run Kafka, our logs expire after 7 days, and our alerts go off pretty quickly if consumers fall behind. Additionally, we archive all our messages to S3 via a process based on Pinterest's Secor [1]. If we were to ever run so far behind that we needed to start over, we can just run mapreduce jobs to rebuild datastores and then let consumers catch back up.
Since Kafka is explicitly a pub/sub replicated+partitioned log, it doesn't make sense to provide backpressure. A single ailing consumer would cascade failure through your system. If you need synchronous or bounded replication, Kafka isn't for you.
Having run Kafka in production for 2 1/2 years now, I can say with certainty that we've never felt like we were lacking in terms of features from Kafka its self, nor have we ever had a consumer fall so far behind it could never catch back up. We do leverage our archives for batch jobs though.
With our use case, we can have unpredictable spikes in volume—which we must consume. Those spikes can be an order of magnitude larger than our baseline average. We put Kafka topics between every stage of our processing pipeline and configure the various Kafka clusters to be
1) Huge. Seriously, way overkill.
2) Able to sustain triple the largest spike we've ever seen without expiring data (size-based expiry)
Since most of our processing stages are essentially consuming, transforming and publishing back to Kafka, we've written them to not ack a message until the result of that stage has been safely published to the next Kafka topic. We require acks from all in-sync replicas. Since the subscriber part of a processing stage doesn't ack until its producer side has received acks from all ISRs, we're pretty confident in our data fidelity. In fact we have other infrastructure that verifies that everything coming out of this Kafka chain is correct and full-fidelity, so we know for certain that this setup can withstand huge spikes in volume without any load shedding.
And then we run all of it redundantly in multiple AWS availability zones to just be sure.
If any stage in our processing pipeline cannot keep up with increased volume, that's fine—it'll catch up eventually because we know that our retention policies are sufficient. And since (almost) every stage is run redundantly, even if one instance somewhere does become slow (or goes down), the redundant pipeline will keep data flowing so we generally have no customer impact. In fact if that does happen but the system as a whole keeps up, we don't even consider it a pageable event. If a machine falls over at 3am but it's redundant cousin keeps up, we'll fix it the next day during business hours.
(Redundant pipelines are also great for deploys—take down an entire side while you redeploy and you've now got zero-downtime deployments)
You can use log compaction to remove all messages apart from the latest for any given key. This gives you a Kafka queue with a bounded size, that is proportional to the table size that you're replicating.
Only sharing in case someone wants a production-ready solution in that area with service monitoring, guaranteed E1P for every event, stream-level permissions, high availability etc.
You can check it out here:
The first flow chart [1] is simple. It shows a user, an app, and three separated data volumes that each serve a separate use case (db, cache, and index [assuming for OLAP workloads]). The chart is headlined with the adamant imperative "Stop doing this".
Instead, Confluence suggests that we start doing this [2]: user -> app -> DB -> extraction -> Kafka -> (Index <-> Cache <-> HDFS) -> monitoring -> samza. Ehhh, no thanks. I like the other option.
We need to understand that good engineering is not about making more work for ourselves. It's about simplicity and elegance, and being able to accomplish complex tasks WITHOUT wrapping ourselves up into some intractable mega-contorsion. More moving parts means more fragility and more waste. Simplicity means beauty, power, and flexibility.
Now, I'm not suggesting that such architectures are never justified. I just want to highlight that the complexity should be eschewed, not celebrated.
If you find yourself writing a blog post that converts a simple 3-step process into a complex 5-step, 9-destination process, alarm bells should be ringing, and you should be talking about why your organization (see Conway's Law) and/or the state of computer science sucks so bad that the 3-step process isn't good enough.
[0] https://www.confluent.io/blog/bottled-water-real-time-integr...
[1] https://cdn2.hubspot.net/hub/540072/file-3062873213-png/blog...
[2] https://cdn2.hubspot.net/hub/540072/file-3062873223-png/blog...
The problem of "I have to get a large portion of the DB into service X" is one I've worked on, so the initial solution is more fragile. It doesn't deal with back pressure. If a service goes down, it "loses" writes and must be resynced from a good state. If for whatever reason data science sets up a HDFS cluster I need to push writes there from my app.
With the second method - I don't have to use all those services - and while I'm not given the same latency guarantees I can be more sure that a user's given change will eventually end up in every service that cares about that given change.
Sure if you only need to write to one DB, the Confluence method is overkill - however if that solution works for you, I'd imagine you haven't hit the volume and the latency requirements that would require you to seek out a solution like Confluence's anyways.
In the second example you handle it once per entity and datastore and the "web app" doesn't need to be aware of the secondary stores.
It's just the listener pattern applied at infra level. You trade code complexity for infra complexity.
It's hard for me to discuss this because the terms are loosely defined, but my feeling is that you may be making implicit false assumptions around the necessary design of the architecture.
>Sure if you only need to write to one DB, the Confluence method is overkill - however if that solution works for you, I'd imagine you haven't hit the volume and the latency requirements that would require you to seek out a solution like Confluence's anyways.
This explanation is probably the reason for the explosion in overengineering. People hear "Hey, if you're not making things really hard, you're just not important enough!"
Well, everyone thinks they're important, so of course, they must make things hard! If they don't, they're not important.
I work with an organization where most people insist we are at this scale. It's totally false. Our load could easily be managed by one well-tuned database replication setup per app and probably 3-4 app servers per app. But this isn't good enough, because, you see, we are very important.
That means that we have dozens of different types of data storage solutions scattered all over the place (including Mongo, Riak, and Dynamo in addition to a variety of SQL DBs), we have dozens of "microservices", and we have hundreds of app servers, even though the technical requirements could be fulfilled with much, much less.
So why do we have all that? Well, because we're "at scale", which is to say, we want to be important. We have a bunch of people sitting around an office all day who appreciate the feeling of importance more than the feeling of a well-engineered system.
Again, I'm not saying complicated architectures are never justified, but I think that in many if not most cases, complication arises due to organizational and personal psychology much more than any technical constraint that truly mandates it.
That is much easier to track, trace, understand, and debug than a request that flows through 7 servers, 5 data layers (that have their own configs, nuances, snafus, caveats, etc.), and 2 proxies, each of which introduces a point of potential corruption/breakage, before it finally reaches the place it's trying to go.
Anyway, like I said in the grandparent, I'm sure there are times when this is the best way to accomplish something. It's hard to nitpick an unfortunately-potentially-appropriate specific solution in the general sense, except to say that its potential applicability is unfortunate.
The problem is that people do not see such complexity for the unfortunate byproduct of poor technical and/or organizational architecture that it is, but rather as evidence of their own expertise. That is exactly backwards. We must fix that false impression to restore sanity to the profession.