Hi Arvid,

>If you are not rereading the topics, why do you compact them?
We are rereading the topics, at any time we might want a completely
different materialized view for a different web service for some new
application feature. Other jobs / new jobs need to read all the up-to-date
rows from the databases.

>correctness depends on compaction < downtime
I still don't see how this is the case if everything just needs to be
overwritten by primary key. To re-emphasize, we do not care about
historical data.

>Again, a cloud-native key/value store would perform much better and be
much cheaper with better SLAs
Is there a cloud-native key/value store which can read from a Postgres WAL
or MySQL binlog and then keep an up-to-date read marker for any
materialization consumers downstream *besides* Kafka + Debezium?

Appreciate all the feedback, though hopefully we can get closer to the same
mental model. If there's really a better alternative here I'm all for it!


On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Rex,
>
> Your initial question was about the impact of compaction on your CDC
> application logic. I have been (unsuccessfully) trying to tell you that you
> do not need compaction and it's counterproductive.
>
> If you are not rereading the topics, why do you compact them? It's lost
> compute time and I/O on the Kafka brokers (which are both very valuable)
> and does not give you anything that an appropriate retention time wouldn't
> give you (=lower SSD usage). It makes the mental model more complicated. An
> aggressive compaction and a larger backlog (compaction time < application
> failure/restart/upgrade time) would lead to incorrect results (in the same
> way an inappropriate retention period may cause data loss for the same
> reason).
>
> The only use case for log compaction is if you're using a Kafka topic for
> a key/value store to serve a web application (in which case, it's usually
> better to take a real key/value store) but then you don't need retractions
> anymore but you'd simply overwrite the actual values or use tombstone
> records for deletions.
>
> If you consume the same topic both for web applications and Flink and
> don't want to use another technology for key/value store, then log
> compaction of retractions kinda makes sense to kill 2 birds with one stone.
> However, you have to live with the downsides on the Flink side (correctness
> depends on compaction < downtime) and on web application (deal with
> retractions even though they do not make any sense at that level). Again, a
> cloud-native key/value store would perform much better and be much cheaper
> with better SLAs and solve all issues on the Flink side (final note: it's
> independent of the technology, any stream processor will encounter the same
> issue as it's a conceptual mismatch).
>
> On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <r...@remind101.com> wrote:
>
>> Hi Arvid,
>>
>> I really appreciate the thorough response but I don't think this
>> contradicts our use case. In servicing web applications we're doing nothing
>> more than taking data from giant databases we use, and performing joins and
>> denormalizing aggs strictly for performance reasons (joining across a lot
>> of stuff on query time is slow) and putting specified results into another
>> database connected to the specified web server. Our Flink jobs are purely
>> used for up-to-date materialized views. We don't care about historical
>> analysis, we only care about what the exact current state of the world is.
>>
>> This is why every row has a primary key, from beginning to end of the job
>> (even though Flink's table api can't seem to detect that after a lot of
>> joins in our plan, but it's logically true since then the join key will be
>> pk). This is also why all we need to do is retract the current row from the
>> Kafka source on the existing primary key that's being overwritten, have
>> that retract propagate downstream to throw away any data transformed from
>> that row, and then process the new row. We don't care what other data
>> changes may have happened in between, it's not applicable to our use case.
>>
>> We're using CDC for nothing more than a way to get the latest rows in
>> real time into Kafka so they can be read by various Flink jobs we hope to
>> build (starting with the one we're currently working on that has ~35
>> stateful operators) which then just transform and forward to another
>> database.
>>
>> ----
>>
>> Reading the Upsert Kafka docs [1] "In the physical operator, we will use
>> state to know whether the key is the first time to be seen. The operator
>> will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for
>> the previous image, or produce DELETE rows with all columns filled with
>> values." This is how we thought the regular Kafka source actually worked,
>> that it had state on PKs it could retract on, because we weren't even
>> thinking of any other use case until it hit me that may not be true.
>> Luckily the doc also provides an example of simply forwarding from DBZ
>> Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't
>> matter since now everything in the actual job reading from Upsert Kafka
>> should function by PK like we need. On that note, I think it may be helpful
>> to edit the documentation to indicate that if you need stateful PK based
>> Kafka consumption it must be via Upsert Kafka.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector
>>
>> Again, thanks for the thorough reply, this really helped my understanding!
>>
>> On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Rex,
>>>
>>> imho log compaction and CDC for historic processes are incompatible on
>>> conceptual level. Let's take this example:
>>>
>>> topic: party membership
>>> +(1, Dem, 2000)
>>> -(1, Dem, 2009)
>>> +(1, Gop, 2009)
>>> Where 1 is the id of a real person.
>>>
>>> Now, let's consider you want to count memberships retroactively each
>>> year.
>>> You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.
>>>
>>> Now, consider you have log compaction with a compaction period <1 year.
>>> You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+
>>> (or in general the time at the latest change).
>>>
>>> Let's take another example:
>>> +(2, Dem, 2000)
>>> -(2, Dem, 2009)
>>>
>>> With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending
>>> on how well your application can deal with incomplete logs. Let's say your
>>> application is simply adding and subtracting retractions, you'd get -1. If
>>> your application is ignoring deletions without insertions (needs to be
>>> tracked for each person), you'd get 0. If your application is not looking
>>> at the retraction type, you'd get 1.
>>>
>>> As you can see, you need to be really careful to craft your application
>>> correctly. The correct result will only be achieved through the most
>>> complex application (aggregating state for each person and dealing with
>>> incomplete information). This is completely independent of Kafka, Debezium,
>>> or Flink.
>>>
>>> ---
>>>
>>> However, as Jan pointed out: If you don't process data before
>>> compaction, then your application is correct. Now, then the question is
>>> what's the benefit of having data in the topic older than the compaction?
>>> The value is close to 0 as you can't really use it for CDC processing
>>> (again independent of Flink).
>>>
>>> Consequently, instead of compaction, I'd go with a lower retention
>>> policy and offload the data to s3 for historic (re)processing (afaik the
>>> cloud offering of confluent finally has automatic offloading but you can
>>> also build it yourself). Then you only need to ensure that your application
>>> is never accessing data that is deleted because of the retention time. In
>>> general, it's better to choose a technology such as Pulsar with tiered
>>> storage that gives you exactly what you want with low overhead: you need
>>> unlimited retention without compaction but without holding much data in
>>> expensive storage (SSD) by offloading automatically to cold storage.
>>>
>>> If this is not working for you, then please share your requirements with
>>> me why you'd need compaction + a different retention for
>>> source/intermediate topics.
>>>
>>> For the final topic, from my experience, a real key/value store works
>>> much better than log compacted topics for serving web applications.
>>> Confluent's marketing is strongly pushing that Kafka can be used as a
>>> database and as a key/value store while in reality, it's "just" a good
>>> distribution log. I can provide pointers that discuss the limitations if
>>> there is interest. Also note that the final topic should not be in CDC
>>> format anymore (so no retractions). It should just contain the current
>>> state. For both examples together it would be
>>> 1, Gop, 2009
>>> and no record for person 2.
>>>
>>>
>>> On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> Digging around, it looks like Upsert Kafka which requires a Primary Key
>>>> will actually do what I want and uses compaction, but it doesn't look
>>>> compatible with Debezium format? Is this on the roadmap?
>>>>
>>>> In the meantime, we're considering consuming from Debezium Kafka (still
>>>> compacted) and then writing directly to an Upsert Kafka sink and then
>>>> reading right back out of a corresponding Upsert Kafka source. Since that
>>>> little roundabout will key all changes by primary key it should give us a
>>>> compacted topic to start with initially. Once we get that working we can
>>>> probably do the same thing with intermediate flink jobs too.
>>>>
>>>> Would appreciate any feedback on this approach, thanks!
>>>>
>>>> On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> Does this also imply that it's not safe to compact the initial topic
>>>>> where data is coming from Debezium? I'd think that Flink's Kafka source
>>>>> would emit retractions on any existing data with a primary key as new data
>>>>> with the same pk arrived (in our case all data has primary keys). I guess
>>>>> that goes back to my original question still however, is this not what the
>>>>> Kafka source does? Is there no way to make that happen?
>>>>>
>>>>> We really can't live with the record amplification, it's sometimes
>>>>> nonlinear and randomly kills RocksDB performance.
>>>>>
>>>>> On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Just to clarify, intermediate topics should in most cases not be
>>>>>> compacted for exactly the reasons if your application depends on all
>>>>>> intermediate data. For the final topic, it makes sense. If you also 
>>>>>> consume
>>>>>> intermediate topics for web application, one solution is to split it into
>>>>>> two topics (like topic-raw for Flink and topic-compacted for 
>>>>>> applications)
>>>>>> and live with some amplification.
>>>>>>
>>>>>> On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <r...@remind101.com>
>>>>>> wrote:
>>>>>>
>>>>>>> All of our Flink jobs are (currently) used for web applications at
>>>>>>> the end of the day. We see a lot of latency spikes from record
>>>>>>> amplification and we were at first hoping we could pass intermediate
>>>>>>> results through Kafka and compact them to lower the record 
>>>>>>> amplification,
>>>>>>> but then it hit me that this might be an issue.
>>>>>>>
>>>>>>> Thanks for the detailed explanation, though it seems like we'll need
>>>>>>> to look for a different solution or only compact on records we know will
>>>>>>> never mutate.
>>>>>>>
>>>>>>> On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <ar...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Jan's response is correct, but I'd like to emphasize the impact on
>>>>>>>> a Flink application.
>>>>>>>>
>>>>>>>> If the compaction happens before the data arrives in Flink, the
>>>>>>>> intermediate updates are lost and just the final result appears.
>>>>>>>> Also if you restart your Flink application and reprocess older
>>>>>>>> data, it will naturally only see the compacted data save for the active
>>>>>>>> segment.
>>>>>>>>
>>>>>>>> So how to make it deterministic? Simply drop topic compaction. If
>>>>>>>> it's coming from CDC and you want to process and produce changelog 
>>>>>>>> streams
>>>>>>>> over several applications, you probably don't want to use log 
>>>>>>>> compactions
>>>>>>>> anyways.
>>>>>>>>
>>>>>>>> Log compaction only makes sense in the snapshot topic that displays
>>>>>>>> the current state (KTable), where you don't think in CDC updates 
>>>>>>>> anymore
>>>>>>>> but just final records, like
>>>>>>>> (user_id: 1, state: "california")
>>>>>>>> (user_id: 1, state: "ohio")
>>>>>>>>
>>>>>>>> Usually, if you use CDC in your company, each application is
>>>>>>>> responsible for building its own current model by tapping in the 
>>>>>>>> relevant
>>>>>>>> changes. Log compacted topics would then only appear at the end of
>>>>>>>> processing, when you hand it over towards non-analytical applications, 
>>>>>>>> such
>>>>>>>> as Web Apps.
>>>>>>>>
>>>>>>>> On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Rex,
>>>>>>>>>
>>>>>>>>> If I understand correctly, you are concerned about behavior of
>>>>>>>>> Kafka source in the case of compacted topic, right? If that is the 
>>>>>>>>> case,
>>>>>>>>> then this is not directly related to Flink, Flink will expose the 
>>>>>>>>> behavior
>>>>>>>>> defined by Kafka. You can read about it for instance here [1]. TL;TD 
>>>>>>>>> - your
>>>>>>>>> pipeline is guaranteed to see every record written to topic (every 
>>>>>>>>> single
>>>>>>>>> update, be it later "overwritten" or not) if it processes the record 
>>>>>>>>> with
>>>>>>>>> latency at most 'delete.retention.ms'. This is configurable per
>>>>>>>>> topic - default 24 hours. If you want to reprocess the data later, 
>>>>>>>>> your
>>>>>>>>> consumer might see only resulting compacted ("retracted") stream, and 
>>>>>>>>> not
>>>>>>>>> every record actually written to the topic.
>>>>>>>>>
>>>>>>>>>  Jan
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262
>>>>>>>>> On 2/24/21 3:14 AM, Rex Fenley wrote:
>>>>>>>>>
>>>>>>>>> Apologies, forgot to finish. If the Kafka source performs its own
>>>>>>>>> retractions of old data on key (user_id) for every append it 
>>>>>>>>> receives, it
>>>>>>>>> should resolve this discrepancy I think.
>>>>>>>>>
>>>>>>>>> Again, is this true? Anything else I'm missing?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <r...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm concerned about the impacts of Kafka's compactions when
>>>>>>>>>> sending data between running flink jobs.
>>>>>>>>>>
>>>>>>>>>> For example, one job produces retract stream records in sequence
>>>>>>>>>> of
>>>>>>>>>> (false, (user_id: 1, state: "california") -- retract
>>>>>>>>>> (true, (user_id: 1, state: "ohio")) -- append
>>>>>>>>>> Which is consumed by Kafka and keyed by user_id, this could end
>>>>>>>>>> up compacting to just
>>>>>>>>>> (true, (user_id: 1, state: "ohio")) -- append
>>>>>>>>>> If some other downstream Flink job has a filter on state ==
>>>>>>>>>> "california" and reads from the Kafka stream, I assume it will miss 
>>>>>>>>>> the
>>>>>>>>>> retract message altogether and produce incorrect results.
>>>>>>>>>>
>>>>>>>>>> Is this true? How do we prevent this from happening? We need to
>>>>>>>>>> use compaction since all our jobs are based on CDC and we can't just 
>>>>>>>>>> drop
>>>>>>>>>> data after x number of days.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to