Jan,

>  - if I don't send any data to a kafka partition for a period longer then
the data retention interval, then all data from the partition is wiped out

If I interpret your first and second message in this email thread
correctly, then you are talking only about your "state topic" here, i.e.
the topic that you read into a KTable.  You should configure this topic to
use log compaction, which will ensure that the latest value for a given key
will never be wiped.  So even if you don't send any data to a Kafka
partition of this (now log-compacted) "state topic" for a long period of
time, you'd always have access to (at least) the latest value for every key.

Would that help?

-Michael





On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský <je...@seznam.cz> wrote:

> Hi Matthias,
>
> first of all, thanks for your answer. Sorry if I didn't explain the
> problem well, I didn't want to dig too much into detail to focus on the
> important and maybe the result was not clear.
>
> My fault, I will try to explain in again. I have two KafkaConsumers in two
> separate threads consuming from two topics - let's call the first one
> "stream topic" (processed like KStream)
>
> and the second one "state topic" (processed like KTable). The state topic
> carries a persistent data that I need in order to process the stream topic,
> so I need to cache the state topic
>
> locally before starting consumption of the stream topic. When the
> application is running normally, there seems to be no issue with this,
>
> because the state topic is updated asynchronously and I use internal locks
> to synchronize the processing inside the application. So far, everything is
> fine.
>
>
> The problem might arise when the application starts - then I do the
> following:
>
>  - lock processing of the stream topic (because I don't have the state
> topic cached)
>
>  - read the current offset N from the state topic (which gives me offsets
> of a message that should be expected next, that is message that has not yet
> been written)
>
>  - reset offset of the state topic to beginning and read it until I read
> offset N - 1, which tells me that I have cached all the data I need to
> process the stream topic, so I unlock the stream processing and continue
>
> All this works well, except for some very rare situation, when the
> following happens (as I understand it, maybe here I am making some mistake):
>
>  - for a long period of time there is no update to (at least single
> partition) of the state topic
>
>  - when I try to cache the state topic during startup as explained above,
> it might never finish, because I will never get a message with offset N - 1
> - that is because I will not get any message at all, because all of the
> data has been wiped out
>
>  - because I don't know if I get all the data from the state topic, I
> cannot start processing the stream topic and the whole application is
> stuck, until first message arrives into all partition of the state topic
> (which might even never happen)
>
>  - I might use some sort of timeout to handle this, but this could be
> dangerous, relying on KafkaConsumer.poll() returning empty records sounds
> to me a little fragile too (because this might also indicate that no
> records could have been fetched within the timeout, am I right?), what
> would totally solve my issue would be that during data retention, the last
> message would always be kept, and therefore I will always get the message
> with offset N - 1, and the whole issue would vanish.
>
> The situation when a partition on the state topic gets no updates during
> long time happens mostly in development environment (where there is little
> to no traffic), but I sense that this could be an issue in production too,
> for example due to some repartitioning of topics.
>
> Does that make any sense to you now?
>
> Thanks again for your response,
>
>  Jan
>
>
>
> On 02/09/2017 08:00 AM, Matthias J. Sax wrote:
>
>> Jan,
>>
>> you scenario is quite complex and I am not sure if I understood every
>> part of it. I try to break it down:
>>
>> In my scenario on startup, I want to read all data from a topic (or a
>>> subset of its partitions),
>>> wait until all the old data has been cached and then start processing of
>>> a different stream
>>>
>> That is hard to accomplish in general. Kafka Streams internally uses
>> KafkaConsumer (one instance per StreamThread) and thus, does rely on the
>> consumer's behavior with regard to poll(). Hence, Streams cannot control
>> in detail, what data will be fetched from the brokers.
>>
>> Furthermore, Streams follow its own internal strategy to pick a record
>> (from the available ones returned from poll()) and you cannot control in
>> your code (at least not directly) what record will be picked.
>>
>> Basically, Streams tried to process records in "timestamp order", ie,
>> based an the timestamp returned from TimestampExtractor. So you can
>> "influence" the processing order by record timestamps (as far as you can
>> influence them) and/or by providing a custom TimestampExtractor.
>>
>> In your example, you might want the records you want to process first
>> (KTable), to have smaller timestamps (ie, be earlier) than the records
>> from your KStream. But even this will only give you "best effort"
>> behavior, and it can happen that a KStream record is processed before
>> all KTable records to processed. It's a know issues but hard to resolve.
>>
>> when the specific partition doesn't get any message within the retention
>>> period,
>>> then I end up stuck trying to prefetch data to the "KTable" - this is
>>> because I get
>>> the offset of the last message (plus 1) from the broker, but I don't get
>>> any data
>>> ever (until I send a message to the partition)
>>>
>> Cannot follow here: if there is no data, than you can of course not
>> process any data -- so why do you end up being stuck?
>>
>> The problem I see here is that kafka tells me what the last offset in a
>>> partition is,
>>> but there is no upper bound on when a first message will arrive,
>>>
>> In general, the latency between data append at the broker and data
>> receive at a consumer is rather small. So even if there is strictly no
>> upper bound until a message gets delivered, this should not be an issue
>> in practice. Or do I miss understand something?
>>
>> even though I reset the offset and start reading from the beginning of a
>>> partition.
>>>
>> How does this relate? Cannot follow.
>>
>> My question is, is it a possibility not to clear the whole partition, but
>>> to always keep at least the last message?
>>>
>> Not with regular retention policy -- not sure if log compaction can help
>> here.
>>
>> That way, the client would always get at least the last message, can
>>> therefore figure out
>>> it is at the end of the partition (reading the old data) and start
>>> processing.
>>>
>> Why is this required? If the client's offset is the same as "endOfLog"
>> for each partition, you can figure out that there is nothing to read. So
>> why would you need the last old message to figure this out?
>>
>>
>> -Matthias
>>
>>
>>
>> On 2/7/17 3:46 AM, Jan Lukavský wrote:
>>
>>> Hi all,
>>>
>>> I have a question how to do a correct caching in KTable-like structure
>>> on application startup. I'm not sure if this belongs to user or dev
>>> maillist, so sorry if I've chosen the bad one. What is my observation so
>>> far:
>>>
>>>   - if I don't send any data to a kafka partition for a period longer
>>> then the data retention interval, then all data from the partition is
>>> wiped out
>>>
>>>   - the index file is not cleared (which is obvious, it has to keep track
>>> of the next offset to assign to a new message)
>>>
>>> In my scenario on startup, I want to read all data from a topic (or a
>>> subset of its partitions), wait until all the old data has been cached
>>> and then start processing of a different stream (basically I'm doing a
>>> join of KStream and KTable, but I have implemented it manually due to
>>> some special behavior). Now, what is the issue here - when the specific
>>> partition doesn't get any message within the retention period, then I
>>> end up stuck trying to prefetch data to the "KTable" - this is because I
>>> get the offset of the last message (plus 1) from the broker, but I don't
>>> get any data ever (until I send a message to the partition). The problem
>>> I see here is that kafka tells me what the last offset in a partition
>>> is, but there is no upper bound on when a first message will arrive,
>>> even though I reset the offset and start reading from the beginning of a
>>> partition. My question is, is it a possibility not to clear the whole
>>> partition, but to always keep at least the last message? That way, the
>>> client would always get at least the last message, can therefore figure
>>> out it is at the end of the partition (reading the old data) and start
>>> processing. I believe that KTable implementation could have a very
>>> similar issue. Or is there any other way around? I could add a timeout,
>>> but this seems a little fragile.
>>>
>>> Thanks in advance for any suggestions and opinions,
>>>
>>>   Jan
>>>
>>>
>

Reply via email to