Jan, > - if I don't send any data to a kafka partition for a period longer then the data retention interval, then all data from the partition is wiped out
If I interpret your first and second message in this email thread correctly, then you are talking only about your "state topic" here, i.e. the topic that you read into a KTable. You should configure this topic to use log compaction, which will ensure that the latest value for a given key will never be wiped. So even if you don't send any data to a Kafka partition of this (now log-compacted) "state topic" for a long period of time, you'd always have access to (at least) the latest value for every key. Would that help? -Michael On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský <je...@seznam.cz> wrote: > Hi Matthias, > > first of all, thanks for your answer. Sorry if I didn't explain the > problem well, I didn't want to dig too much into detail to focus on the > important and maybe the result was not clear. > > My fault, I will try to explain in again. I have two KafkaConsumers in two > separate threads consuming from two topics - let's call the first one > "stream topic" (processed like KStream) > > and the second one "state topic" (processed like KTable). The state topic > carries a persistent data that I need in order to process the stream topic, > so I need to cache the state topic > > locally before starting consumption of the stream topic. When the > application is running normally, there seems to be no issue with this, > > because the state topic is updated asynchronously and I use internal locks > to synchronize the processing inside the application. So far, everything is > fine. > > > The problem might arise when the application starts - then I do the > following: > > - lock processing of the stream topic (because I don't have the state > topic cached) > > - read the current offset N from the state topic (which gives me offsets > of a message that should be expected next, that is message that has not yet > been written) > > - reset offset of the state topic to beginning and read it until I read > offset N - 1, which tells me that I have cached all the data I need to > process the stream topic, so I unlock the stream processing and continue > > All this works well, except for some very rare situation, when the > following happens (as I understand it, maybe here I am making some mistake): > > - for a long period of time there is no update to (at least single > partition) of the state topic > > - when I try to cache the state topic during startup as explained above, > it might never finish, because I will never get a message with offset N - 1 > - that is because I will not get any message at all, because all of the > data has been wiped out > > - because I don't know if I get all the data from the state topic, I > cannot start processing the stream topic and the whole application is > stuck, until first message arrives into all partition of the state topic > (which might even never happen) > > - I might use some sort of timeout to handle this, but this could be > dangerous, relying on KafkaConsumer.poll() returning empty records sounds > to me a little fragile too (because this might also indicate that no > records could have been fetched within the timeout, am I right?), what > would totally solve my issue would be that during data retention, the last > message would always be kept, and therefore I will always get the message > with offset N - 1, and the whole issue would vanish. > > The situation when a partition on the state topic gets no updates during > long time happens mostly in development environment (where there is little > to no traffic), but I sense that this could be an issue in production too, > for example due to some repartitioning of topics. > > Does that make any sense to you now? > > Thanks again for your response, > > Jan > > > > On 02/09/2017 08:00 AM, Matthias J. Sax wrote: > >> Jan, >> >> you scenario is quite complex and I am not sure if I understood every >> part of it. I try to break it down: >> >> In my scenario on startup, I want to read all data from a topic (or a >>> subset of its partitions), >>> wait until all the old data has been cached and then start processing of >>> a different stream >>> >> That is hard to accomplish in general. Kafka Streams internally uses >> KafkaConsumer (one instance per StreamThread) and thus, does rely on the >> consumer's behavior with regard to poll(). Hence, Streams cannot control >> in detail, what data will be fetched from the brokers. >> >> Furthermore, Streams follow its own internal strategy to pick a record >> (from the available ones returned from poll()) and you cannot control in >> your code (at least not directly) what record will be picked. >> >> Basically, Streams tried to process records in "timestamp order", ie, >> based an the timestamp returned from TimestampExtractor. So you can >> "influence" the processing order by record timestamps (as far as you can >> influence them) and/or by providing a custom TimestampExtractor. >> >> In your example, you might want the records you want to process first >> (KTable), to have smaller timestamps (ie, be earlier) than the records >> from your KStream. But even this will only give you "best effort" >> behavior, and it can happen that a KStream record is processed before >> all KTable records to processed. It's a know issues but hard to resolve. >> >> when the specific partition doesn't get any message within the retention >>> period, >>> then I end up stuck trying to prefetch data to the "KTable" - this is >>> because I get >>> the offset of the last message (plus 1) from the broker, but I don't get >>> any data >>> ever (until I send a message to the partition) >>> >> Cannot follow here: if there is no data, than you can of course not >> process any data -- so why do you end up being stuck? >> >> The problem I see here is that kafka tells me what the last offset in a >>> partition is, >>> but there is no upper bound on when a first message will arrive, >>> >> In general, the latency between data append at the broker and data >> receive at a consumer is rather small. So even if there is strictly no >> upper bound until a message gets delivered, this should not be an issue >> in practice. Or do I miss understand something? >> >> even though I reset the offset and start reading from the beginning of a >>> partition. >>> >> How does this relate? Cannot follow. >> >> My question is, is it a possibility not to clear the whole partition, but >>> to always keep at least the last message? >>> >> Not with regular retention policy -- not sure if log compaction can help >> here. >> >> That way, the client would always get at least the last message, can >>> therefore figure out >>> it is at the end of the partition (reading the old data) and start >>> processing. >>> >> Why is this required? If the client's offset is the same as "endOfLog" >> for each partition, you can figure out that there is nothing to read. So >> why would you need the last old message to figure this out? >> >> >> -Matthias >> >> >> >> On 2/7/17 3:46 AM, Jan Lukavský wrote: >> >>> Hi all, >>> >>> I have a question how to do a correct caching in KTable-like structure >>> on application startup. I'm not sure if this belongs to user or dev >>> maillist, so sorry if I've chosen the bad one. What is my observation so >>> far: >>> >>> - if I don't send any data to a kafka partition for a period longer >>> then the data retention interval, then all data from the partition is >>> wiped out >>> >>> - the index file is not cleared (which is obvious, it has to keep track >>> of the next offset to assign to a new message) >>> >>> In my scenario on startup, I want to read all data from a topic (or a >>> subset of its partitions), wait until all the old data has been cached >>> and then start processing of a different stream (basically I'm doing a >>> join of KStream and KTable, but I have implemented it manually due to >>> some special behavior). Now, what is the issue here - when the specific >>> partition doesn't get any message within the retention period, then I >>> end up stuck trying to prefetch data to the "KTable" - this is because I >>> get the offset of the last message (plus 1) from the broker, but I don't >>> get any data ever (until I send a message to the partition). The problem >>> I see here is that kafka tells me what the last offset in a partition >>> is, but there is no upper bound on when a first message will arrive, >>> even though I reset the offset and start reading from the beginning of a >>> partition. My question is, is it a possibility not to clear the whole >>> partition, but to always keep at least the last message? That way, the >>> client would always get at least the last message, can therefore figure >>> out it is at the end of the partition (reading the old data) and start >>> processing. I believe that KTable implementation could have a very >>> similar issue. Or is there any other way around? I could add a timeout, >>> but this seems a little fragile. >>> >>> Thanks in advance for any suggestions and opinions, >>> >>> Jan >>> >>> >