[ 
https://issues.apache.org/jira/browse/FLINK-17800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124850#comment-17124850
 ] 

Yun Tang commented on FLINK-17800:
----------------------------------

[~sewen], the root cause is the prefix extractor of {{NoopTransform}} 
introduced by {{optimizeForPointLookup}}.

>From this lesson, we should treat the javadoc of {{#optimizeForPointLookup}} 
>not a suggestion but a hard limit:
{noformat}
Use this if you don't need to keep the data sorted, i.e. you'll never use an 
iterator, only Put() and Get() API calls.
{noformat}
However, I cannot check whether a prefix extractor has ever been configured 
from java side to avoid user misuse, that is to say, we cannot disable the 
{{optimizeForPointLookup}} option from Flink side.

The walk-around solution is to call {{setTotalOrderSeek(true)}} for 
{{ReadOptions}} when creating iterators, that could ensure the sorted order is 
correct. I have tested the performance of iterator-like operations of map state 
below, it looks like no obvious performance change.
||Iterator like opertions||original ops/ms||with setTotalOrderSeek 
ops/ms||performance change||
|MapStateBenchmark.mapEntries|322.306|320.794|-0.5%|
|MapStateBenchmark.mapIsEmpty|42.012|42.042|0.1%|
|MapStateBenchmark.mapIterator|313.1|319.3|2.0%|
|MapStateBenchmark.mapKeys|324.088|311.491|-3.9%|
|MapStateBenchmark.mapValues|321.387|328.309|2.2%|

> RocksDB optimizeForPointLookup results in missing time windows
> --------------------------------------------------------------
>
>                 Key: FLINK-17800
>                 URL: https://issues.apache.org/jira/browse/FLINK-17800
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.10.0, 1.10.1
>            Reporter: Yordan Pavlov
>            Assignee: Yun Tang
>            Priority: Blocker
>             Fix For: 1.11.0, 1.10.2
>
>         Attachments: MissingWindows.scala, MyMissingWindows.scala, 
> MyMissingWindows.scala
>
>
> +My Setup:+
> We have been using the _RocksDb_ option of _optimizeForPointLookup_ and 
> running version 1.7 for years. Upon upgrading to Flink 1.10 we started 
> receiving a strange behavior of missing time windows on a streaming Flink 
> job. For the purpose of testing I experimented with previous Flink version 
> and (1.8, 1.9, 1.9.3) and non of them showed the problem
>  
> A sample of the code demonstrating the problem is here:
> {code:java}
>  val datastream = env
>  .addSource(KafkaSource.keyedElements(config.kafkaElements, 
> List(config.kafkaBootstrapServer)))
>  val result = datastream
>  .keyBy( _ => 1)
>  .timeWindow(Time.milliseconds(1))
>  .print()
> {code}
>  
>  
> The source consists of 3 streams (being either 3 Kafka partitions or 3 Kafka 
> topics), the elements in each of the streams are separately increasing. The 
> elements generate increasing timestamps using an event time and start from 1, 
> increasing by 1. The first partitions would consist of timestamps 1, 2, 10, 
> 15..., the second of 4, 5, 6, 11..., the third of 3, 7, 8, 9...
>  
> +What I observe:+
> The time windows would open as I expect for the first 127 timestamps. Then 
> there would be a huge gap with no opened windows, if the source has many 
> elements, then next open window would be having a timestamp in the thousands. 
> A gap of hundred of elements would be created with what appear to be 'lost' 
> elements. Those elements are not reported as late (if tested with the 
> ._sideOutputLateData_ operator). The way we have been using the option is by 
> setting in inside the config like so:
> ??etherbi.rocksDB.columnOptions.optimizeForPointLookup=268435456??
> We have been using it for performance reasons as we have huge RocksDB state 
> backend.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to