[ https://issues.apache.org/jira/browse/FLINK-17800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124850#comment-17124850 ]
Yun Tang commented on FLINK-17800: ---------------------------------- [~sewen], the root cause is the prefix extractor of {{NoopTransform}} introduced by {{optimizeForPointLookup}}. >From this lesson, we should treat the javadoc of {{#optimizeForPointLookup}} >not a suggestion but a hard limit: {noformat} Use this if you don't need to keep the data sorted, i.e. you'll never use an iterator, only Put() and Get() API calls. {noformat} However, I cannot check whether a prefix extractor has ever been configured from java side to avoid user misuse, that is to say, we cannot disable the {{optimizeForPointLookup}} option from Flink side. The walk-around solution is to call {{setTotalOrderSeek(true)}} for {{ReadOptions}} when creating iterators, that could ensure the sorted order is correct. I have tested the performance of iterator-like operations of map state below, it looks like no obvious performance change. ||Iterator like opertions||original ops/ms||with setTotalOrderSeek ops/ms||performance change|| |MapStateBenchmark.mapEntries|322.306|320.794|-0.5%| |MapStateBenchmark.mapIsEmpty|42.012|42.042|0.1%| |MapStateBenchmark.mapIterator|313.1|319.3|2.0%| |MapStateBenchmark.mapKeys|324.088|311.491|-3.9%| |MapStateBenchmark.mapValues|321.387|328.309|2.2%| > RocksDB optimizeForPointLookup results in missing time windows > -------------------------------------------------------------- > > Key: FLINK-17800 > URL: https://issues.apache.org/jira/browse/FLINK-17800 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.10.0, 1.10.1 > Reporter: Yordan Pavlov > Assignee: Yun Tang > Priority: Blocker > Fix For: 1.11.0, 1.10.2 > > Attachments: MissingWindows.scala, MyMissingWindows.scala, > MyMissingWindows.scala > > > +My Setup:+ > We have been using the _RocksDb_ option of _optimizeForPointLookup_ and > running version 1.7 for years. Upon upgrading to Flink 1.10 we started > receiving a strange behavior of missing time windows on a streaming Flink > job. For the purpose of testing I experimented with previous Flink version > and (1.8, 1.9, 1.9.3) and non of them showed the problem > > A sample of the code demonstrating the problem is here: > {code:java} > val datastream = env > .addSource(KafkaSource.keyedElements(config.kafkaElements, > List(config.kafkaBootstrapServer))) > val result = datastream > .keyBy( _ => 1) > .timeWindow(Time.milliseconds(1)) > .print() > {code} > > > The source consists of 3 streams (being either 3 Kafka partitions or 3 Kafka > topics), the elements in each of the streams are separately increasing. The > elements generate increasing timestamps using an event time and start from 1, > increasing by 1. The first partitions would consist of timestamps 1, 2, 10, > 15..., the second of 4, 5, 6, 11..., the third of 3, 7, 8, 9... > > +What I observe:+ > The time windows would open as I expect for the first 127 timestamps. Then > there would be a huge gap with no opened windows, if the source has many > elements, then next open window would be having a timestamp in the thousands. > A gap of hundred of elements would be created with what appear to be 'lost' > elements. Those elements are not reported as late (if tested with the > ._sideOutputLateData_ operator). The way we have been using the option is by > setting in inside the config like so: > ??etherbi.rocksDB.columnOptions.optimizeForPointLookup=268435456?? > We have been using it for performance reasons as we have huge RocksDB state > backend. -- This message was sent by Atlassian Jira (v8.3.4#803005)