Hi Arvid, >If you are not rereading the topics, why do you compact them? We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.
>correctness depends on compaction < downtime I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data. >Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs Is there a cloud-native key/value store which can read from a Postgres WAL or MySQL binlog and then keep an up-to-date read marker for any materialization consumers downstream *besides* Kafka + Debezium? Appreciate all the feedback, though hopefully we can get closer to the same mental model. If there's really a better alternative here I'm all for it! On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise <ar...@apache.org> wrote: > Hi Rex, > > Your initial question was about the impact of compaction on your CDC > application logic. I have been (unsuccessfully) trying to tell you that you > do not need compaction and it's counterproductive. > > If you are not rereading the topics, why do you compact them? It's lost > compute time and I/O on the Kafka brokers (which are both very valuable) > and does not give you anything that an appropriate retention time wouldn't > give you (=lower SSD usage). It makes the mental model more complicated. An > aggressive compaction and a larger backlog (compaction time < application > failure/restart/upgrade time) would lead to incorrect results (in the same > way an inappropriate retention period may cause data loss for the same > reason). > > The only use case for log compaction is if you're using a Kafka topic for > a key/value store to serve a web application (in which case, it's usually > better to take a real key/value store) but then you don't need retractions > anymore but you'd simply overwrite the actual values or use tombstone > records for deletions. > > If you consume the same topic both for web applications and Flink and > don't want to use another technology for key/value store, then log > compaction of retractions kinda makes sense to kill 2 birds with one stone. > However, you have to live with the downsides on the Flink side (correctness > depends on compaction < downtime) and on web application (deal with > retractions even though they do not make any sense at that level). Again, a > cloud-native key/value store would perform much better and be much cheaper > with better SLAs and solve all issues on the Flink side (final note: it's > independent of the technology, any stream processor will encounter the same > issue as it's a conceptual mismatch). > > On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <r...@remind101.com> wrote: > >> Hi Arvid, >> >> I really appreciate the thorough response but I don't think this >> contradicts our use case. In servicing web applications we're doing nothing >> more than taking data from giant databases we use, and performing joins and >> denormalizing aggs strictly for performance reasons (joining across a lot >> of stuff on query time is slow) and putting specified results into another >> database connected to the specified web server. Our Flink jobs are purely >> used for up-to-date materialized views. We don't care about historical >> analysis, we only care about what the exact current state of the world is. >> >> This is why every row has a primary key, from beginning to end of the job >> (even though Flink's table api can't seem to detect that after a lot of >> joins in our plan, but it's logically true since then the join key will be >> pk). This is also why all we need to do is retract the current row from the >> Kafka source on the existing primary key that's being overwritten, have >> that retract propagate downstream to throw away any data transformed from >> that row, and then process the new row. We don't care what other data >> changes may have happened in between, it's not applicable to our use case. >> >> We're using CDC for nothing more than a way to get the latest rows in >> real time into Kafka so they can be read by various Flink jobs we hope to >> build (starting with the one we're currently working on that has ~35 >> stateful operators) which then just transform and forward to another >> database. >> >> ---- >> >> Reading the Upsert Kafka docs [1] "In the physical operator, we will use >> state to know whether the key is the first time to be seen. The operator >> will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for >> the previous image, or produce DELETE rows with all columns filled with >> values." This is how we thought the regular Kafka source actually worked, >> that it had state on PKs it could retract on, because we weren't even >> thinking of any other use case until it hit me that may not be true. >> Luckily the doc also provides an example of simply forwarding from DBZ >> Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't >> matter since now everything in the actual job reading from Upsert Kafka >> should function by PK like we need. On that note, I think it may be helpful >> to edit the documentation to indicate that if you need stateful PK based >> Kafka consumption it must be via Upsert Kafka. >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector >> >> Again, thanks for the thorough reply, this really helped my understanding! >> >> On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Rex, >>> >>> imho log compaction and CDC for historic processes are incompatible on >>> conceptual level. Let's take this example: >>> >>> topic: party membership >>> +(1, Dem, 2000) >>> -(1, Dem, 2009) >>> +(1, Gop, 2009) >>> Where 1 is the id of a real person. >>> >>> Now, let's consider you want to count memberships retroactively each >>> year. >>> You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem. >>> >>> Now, consider you have log compaction with a compaction period <1 year. >>> You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ >>> (or in general the time at the latest change). >>> >>> Let's take another example: >>> +(2, Dem, 2000) >>> -(2, Dem, 2009) >>> >>> With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending >>> on how well your application can deal with incomplete logs. Let's say your >>> application is simply adding and subtracting retractions, you'd get -1. If >>> your application is ignoring deletions without insertions (needs to be >>> tracked for each person), you'd get 0. If your application is not looking >>> at the retraction type, you'd get 1. >>> >>> As you can see, you need to be really careful to craft your application >>> correctly. The correct result will only be achieved through the most >>> complex application (aggregating state for each person and dealing with >>> incomplete information). This is completely independent of Kafka, Debezium, >>> or Flink. >>> >>> --- >>> >>> However, as Jan pointed out: If you don't process data before >>> compaction, then your application is correct. Now, then the question is >>> what's the benefit of having data in the topic older than the compaction? >>> The value is close to 0 as you can't really use it for CDC processing >>> (again independent of Flink). >>> >>> Consequently, instead of compaction, I'd go with a lower retention >>> policy and offload the data to s3 for historic (re)processing (afaik the >>> cloud offering of confluent finally has automatic offloading but you can >>> also build it yourself). Then you only need to ensure that your application >>> is never accessing data that is deleted because of the retention time. In >>> general, it's better to choose a technology such as Pulsar with tiered >>> storage that gives you exactly what you want with low overhead: you need >>> unlimited retention without compaction but without holding much data in >>> expensive storage (SSD) by offloading automatically to cold storage. >>> >>> If this is not working for you, then please share your requirements with >>> me why you'd need compaction + a different retention for >>> source/intermediate topics. >>> >>> For the final topic, from my experience, a real key/value store works >>> much better than log compacted topics for serving web applications. >>> Confluent's marketing is strongly pushing that Kafka can be used as a >>> database and as a key/value store while in reality, it's "just" a good >>> distribution log. I can provide pointers that discuss the limitations if >>> there is interest. Also note that the final topic should not be in CDC >>> format anymore (so no retractions). It should just contain the current >>> state. For both examples together it would be >>> 1, Gop, 2009 >>> and no record for person 2. >>> >>> >>> On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Digging around, it looks like Upsert Kafka which requires a Primary Key >>>> will actually do what I want and uses compaction, but it doesn't look >>>> compatible with Debezium format? Is this on the roadmap? >>>> >>>> In the meantime, we're considering consuming from Debezium Kafka (still >>>> compacted) and then writing directly to an Upsert Kafka sink and then >>>> reading right back out of a corresponding Upsert Kafka source. Since that >>>> little roundabout will key all changes by primary key it should give us a >>>> compacted topic to start with initially. Once we get that working we can >>>> probably do the same thing with intermediate flink jobs too. >>>> >>>> Would appreciate any feedback on this approach, thanks! >>>> >>>> On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Does this also imply that it's not safe to compact the initial topic >>>>> where data is coming from Debezium? I'd think that Flink's Kafka source >>>>> would emit retractions on any existing data with a primary key as new data >>>>> with the same pk arrived (in our case all data has primary keys). I guess >>>>> that goes back to my original question still however, is this not what the >>>>> Kafka source does? Is there no way to make that happen? >>>>> >>>>> We really can't live with the record amplification, it's sometimes >>>>> nonlinear and randomly kills RocksDB performance. >>>>> >>>>> On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> Just to clarify, intermediate topics should in most cases not be >>>>>> compacted for exactly the reasons if your application depends on all >>>>>> intermediate data. For the final topic, it makes sense. If you also >>>>>> consume >>>>>> intermediate topics for web application, one solution is to split it into >>>>>> two topics (like topic-raw for Flink and topic-compacted for >>>>>> applications) >>>>>> and live with some amplification. >>>>>> >>>>>> On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <r...@remind101.com> >>>>>> wrote: >>>>>> >>>>>>> All of our Flink jobs are (currently) used for web applications at >>>>>>> the end of the day. We see a lot of latency spikes from record >>>>>>> amplification and we were at first hoping we could pass intermediate >>>>>>> results through Kafka and compact them to lower the record >>>>>>> amplification, >>>>>>> but then it hit me that this might be an issue. >>>>>>> >>>>>>> Thanks for the detailed explanation, though it seems like we'll need >>>>>>> to look for a different solution or only compact on records we know will >>>>>>> never mutate. >>>>>>> >>>>>>> On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <ar...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Jan's response is correct, but I'd like to emphasize the impact on >>>>>>>> a Flink application. >>>>>>>> >>>>>>>> If the compaction happens before the data arrives in Flink, the >>>>>>>> intermediate updates are lost and just the final result appears. >>>>>>>> Also if you restart your Flink application and reprocess older >>>>>>>> data, it will naturally only see the compacted data save for the active >>>>>>>> segment. >>>>>>>> >>>>>>>> So how to make it deterministic? Simply drop topic compaction. If >>>>>>>> it's coming from CDC and you want to process and produce changelog >>>>>>>> streams >>>>>>>> over several applications, you probably don't want to use log >>>>>>>> compactions >>>>>>>> anyways. >>>>>>>> >>>>>>>> Log compaction only makes sense in the snapshot topic that displays >>>>>>>> the current state (KTable), where you don't think in CDC updates >>>>>>>> anymore >>>>>>>> but just final records, like >>>>>>>> (user_id: 1, state: "california") >>>>>>>> (user_id: 1, state: "ohio") >>>>>>>> >>>>>>>> Usually, if you use CDC in your company, each application is >>>>>>>> responsible for building its own current model by tapping in the >>>>>>>> relevant >>>>>>>> changes. Log compacted topics would then only appear at the end of >>>>>>>> processing, when you hand it over towards non-analytical applications, >>>>>>>> such >>>>>>>> as Web Apps. >>>>>>>> >>>>>>>> On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <je...@seznam.cz> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Rex, >>>>>>>>> >>>>>>>>> If I understand correctly, you are concerned about behavior of >>>>>>>>> Kafka source in the case of compacted topic, right? If that is the >>>>>>>>> case, >>>>>>>>> then this is not directly related to Flink, Flink will expose the >>>>>>>>> behavior >>>>>>>>> defined by Kafka. You can read about it for instance here [1]. TL;TD >>>>>>>>> - your >>>>>>>>> pipeline is guaranteed to see every record written to topic (every >>>>>>>>> single >>>>>>>>> update, be it later "overwritten" or not) if it processes the record >>>>>>>>> with >>>>>>>>> latency at most 'delete.retention.ms'. This is configurable per >>>>>>>>> topic - default 24 hours. If you want to reprocess the data later, >>>>>>>>> your >>>>>>>>> consumer might see only resulting compacted ("retracted") stream, and >>>>>>>>> not >>>>>>>>> every record actually written to the topic. >>>>>>>>> >>>>>>>>> Jan >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262 >>>>>>>>> On 2/24/21 3:14 AM, Rex Fenley wrote: >>>>>>>>> >>>>>>>>> Apologies, forgot to finish. If the Kafka source performs its own >>>>>>>>> retractions of old data on key (user_id) for every append it >>>>>>>>> receives, it >>>>>>>>> should resolve this discrepancy I think. >>>>>>>>> >>>>>>>>> Again, is this true? Anything else I'm missing? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <r...@remind101.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I'm concerned about the impacts of Kafka's compactions when >>>>>>>>>> sending data between running flink jobs. >>>>>>>>>> >>>>>>>>>> For example, one job produces retract stream records in sequence >>>>>>>>>> of >>>>>>>>>> (false, (user_id: 1, state: "california") -- retract >>>>>>>>>> (true, (user_id: 1, state: "ohio")) -- append >>>>>>>>>> Which is consumed by Kafka and keyed by user_id, this could end >>>>>>>>>> up compacting to just >>>>>>>>>> (true, (user_id: 1, state: "ohio")) -- append >>>>>>>>>> If some other downstream Flink job has a filter on state == >>>>>>>>>> "california" and reads from the Kafka stream, I assume it will miss >>>>>>>>>> the >>>>>>>>>> retract message altogether and produce incorrect results. >>>>>>>>>> >>>>>>>>>> Is this true? How do we prevent this from happening? We need to >>>>>>>>>> use compaction since all our jobs are based on CDC and we can't just >>>>>>>>>> drop >>>>>>>>>> data after x number of days. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>