Hi Bruno, John and Sophie,

thank you very much for quick responses, you are the best. After thinking
about it a little bit more, it seems fishy.

>From logs, I see that it is not happening when application is running
normally.

I have checked timestamps (windowStartTimestamp) - connecting local
instance in debug mode to Kafka cluster. And they are mixed up. Not always,
there can be a day with good sequence and then a time interval with mixed
up timestamps, like these (store retention is 20.6 minutes):
StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z
StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z
StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z
StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z
StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z
StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z
StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z
StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z
StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z
StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z
StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z
StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z
StreamThread-1.task.1_57, 2020-02-07T12:59:27.364Z
StreamThread-1.task.1_57, 2020-02-07T13:01:34.313Z
StreamThread-1.task.1_57, 2020-02-07T13:07:56.379Z
StreamThread-1.task.1_57, 2020-02-07T12:45:32.984Z
StreamThread-1.task.1_57, 2020-02-07T12:45:44.232Z
StreamThread-1.task.1_57, 2020-02-07T12:45:59.594Z
StreamThread-1.task.1_57, 2020-02-07T12:46:02.860Z
StreamThread-1.task.1_57, 2020-02-07T13:02:17.658Z
StreamThread-1.task.1_57, 2020-02-07T12:46:25.125Z
StreamThread-1.task.1_57, 2020-02-07T12:46:44.864Z
StreamThread-1.task.1_57, 2020-02-07T12:44:44.074Z
StreamThread-1.task.1_57, 2020-02-07T13:03:36.221Z
StreamThread-1.task.1_57, 2020-02-07T13:12:16.691Z
StreamThread-1.task.1_57, 2020-02-07T12:56:55.214Z

Picking a few of these, the stack trace was like:
put:134, InMemoryWindowStore (org.apache.kafka.streams.state.internals)
lambda$init$0:112, InMemoryWindowStore
(org.apache.kafka.streams.state.internals)
restore:-1, 69348804
(org.apache.kafka.streams.state.internals.InMemoryWindowStore$$Lambda$270)
lambda$adapt$1:47, StateRestoreCallbackAdapter
(org.apache.kafka.streams.processor.internals)
restoreBatch:-1, 791473363
(org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$269)
restoreBatch:89, CompositeRestoreListener
(org.apache.kafka.streams.processor.internals)
restore:92, StateRestorer (org.apache.kafka.streams.processor.internals)
processNext:349, StoreChangelogReader
(org.apache.kafka.streams.processor.internals)
restore:93, StoreChangelogReader
(org.apache.kafka.streams.processor.internals)
updateNewAndRestoringTasks:389, TaskManager
(org.apache.kafka.streams.processor.internals)
runOnce:769, StreamThread (org.apache.kafka.streams.processor.internals)
runLoop:698, StreamThread (org.apache.kafka.streams.processor.internals)
run:671, StreamThread (org.apache.kafka.streams.processor.internals)

So I believe it happens on stream restoration phase. And it's restoring
state from internal changelog topic. It's all task.1_57 so I expect that it
is a single partition.

Thinking about it, I don't understand how such a case can even
theoretically happen. I expect that a window, in order to be written to the
changelog topic, first needs to go through "put"; so even if it's mixed on
the input side, it should be skipped if expired at the moment of "put"
(relatively to observedStreamTime) and on restoration everything should be
fine.

As the next step, I would like to list/inspect records and their timestamps
from given partition of the changelog topic via a command line tool (or in
some other way) - to confirm if they are really stored this way. If you
have a tip on how to do it, please let me know.

That is all I have for now. I would like to resolve it. I will post it here
if I come up with something new.

Thank you
Jiri



On Mon, Feb 10, 2020 at 10:14 PM John Roesler <vvcep...@apache.org> wrote:
>
> Hey all,
>
> Sorry for the confusion. Bruno set me straight offline.
>
> Previously, we had metrics for each reason for skipping records, and the
> rationale was that you would monitor the metrics and only turn to the logs
> if you needed to *debug* unexpected record skipping. Note that skipping
> records by itself isn't a cause for concern, since this is exactly what
Streams
> is designed to do in a number of situations.
>
> However, during the KIP-444 discussion, the decision was reversed, and we
> decided to just log one "roll-up" metric for all skips and increase the
log
> messages to warning level for debuggability. This particularly makes sense
> because you otherwise would have to restart the application to change the
> log level if you needed to figure out why the single skipped-record metric
> is non-zero. And then you may not even observe it again.
>
> I either missed the memo on that discussion, or participated in it and
then
> forgot it even happened. I'm not sure I want to look back at the thread to
> find out.
>
> Anyway, I've closed the PR I opened to move it back to debug. We should
> still try to help figure out the root cause of this particular email
thread,
> though.
>
> Thanks,
> -John
>
> On Mon, Feb 10, 2020, at 12:20, Sophie Blee-Goldman wrote:
> > While I agree that seems like it was probably a refactoring mistake, I'm
> > not
> > convinced it isn't the right thing to do. John, can you reiterate the
> > argument
> > for setting it to debug way back when?
> >
> > I would actually present this exact situation as an argument for
keeping it
> > as
> > warn, since something indeed seems fishy here that was only surfaced
> > through this warning. That said, maybe the metric is the more
appropriate
> > way to bring attention to this: not sure if it's info or debug level
> > though, or
> > how likely it is that anyone really pays attention to it?
> >
> > On Mon, Feb 10, 2020 at 9:53 AM John Roesler <j...@vvcephei.org> wrote:
> >
> > > Hi,
> > >
> > > I’m sorry for the trouble. It looks like it was a mistake during
> > >
> > > https://github.com/apache/kafka/pull/6521
> > >
> > > Specifically, while addressing code review comments to change a bunch
of
> > > other logs from debugs to warnings, that one seems to have been
included by
> > > accident:
> > >
https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c
> > >
> > > I’ll see if I can fix it today.
> > >
> > > Regarding Bruno's thoughts, there was a pretty old decision to
capture the
> > > "skipped records" as a metric for visibility and log it at the debug
level
> > > for debuggability. We decided that "warning" wasn't the right level
because
> > > Streams is operating completely as specified.
> > >
> > > However, I do agree that it doesn't seem right to see more skipped
records
> > > during start-up; I would expect to see exactly the same records
skipped
> > > during start-up as during regular processing, since the skipping
logic is
> > > completely deterministic and based on the sequence of timestamps your
> > > records have in the topic.  Maybe you just notice it more during
startup?
> > > I.e., if there are 1000 warning logs spread over a few months, then
you
> > > don't notice it, but when you see them all together at start-up, it's
more
> > > concerning?
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> > > > Hi,
> > > >
> > > > I am pretty sure this was intentional. All skipped records log
> > > > messages are on WARN level.
> > > >
> > > > If a lot of your records are skipped on app restart with this log
> > > > message on WARN-level, they were also skipped with the log message
on
> > > > DEBUG-level. You simply did not know about it before. With an
> > > > in-memory window store, this message is logged when a window with a
> > > > start time older than the current stream time minus the retention
> > > > period is put into the window store, i.e., the window is NOT
inserted
> > > > into the window stroe. If you get a lot of them on app restart, you
> > > > should have a look at the timestamps of your records and the
retention
> > > > of your window store. If those values do not explain the behavior,
> > > > please try to find a minimal example that shows the issue and post
it
> > > > here on the mailing list.
> > > >
> > > > On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří <sa...@avast.com.invalid
>
> > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > in
> > > > >
> > >
https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> > > > >
> > > > > log level of "Skipping record for expired segment" was changed
from
> > > debug
> > > > > to warn. Was it intentional change? Should it be somehow handled
by
> > > user?
> > > > > How can user handle it? I am getting a lot of these on app
restart.
> > > >
> > >
> >



-- 

Jiří Samek | Software Developer

AVAST Software s.r.o. | Pikrtova 1737/1a | 140 00  Praha 4

M +420 734 524 549 | E sa...@avast.com | W www.avast.com

Reply via email to