Re: Why RocksDB metrics cache-usage is larger than cache-capacity

2024-04-18 Thread Hangxiang Yu
Hi, Lei. It's indeed a bit confusing. Could you share the related rocksdb log which may contain more detailed info ? On Fri, Apr 12, 2024 at 12:49 PM Lei Wang wrote: > > I enable RocksDB native metrics and do some performance tuning. > > state.backend.rocksdb.block.cache-size is set to 128m,4 sl

Re: Flink job unable to restore from savepoint

2024-03-27 Thread Hangxiang Yu
Hi, Prashant. Which Flink version did you use? And Did you modify your job logic or configurations ? If yes, Could you share changed things ? On Wed, Mar 27, 2024 at 3:37 PM prashant parbhane wrote: > Hello, > > We have been facing this weird issue of not being able to restore from > savepoint,

Re: Advice on checkpoint interval best practices

2023-12-05 Thread Hangxiang Yu
Hi, Oscar. Just share my thoughts: Benefits of more aggressive checkpoint: 1. less recovery time as you mentioned (which is also related to data flink has to rollback to process) 2. less end-to-end latency for checkpoint-bounded sink in exactly-once mode Costs of more aggressive checkpoint: 1. more

Re: Queryable state feature in latest version!!

2023-11-06 Thread Hangxiang Yu
Hi, Puneet. Queryable State has been deprecated in the latest version which will be removed in Flink 2.0. The Interface/Usage is freezed in the 1.x, so you still could reference the documents of previous versions to use it. BTW, Could you also share something about your scenarios using it ? That wi

Re: Checkpoints are not triggering when S3 is unavailable

2023-11-05 Thread Hangxiang Yu
Hi, Do you mean your checkpoint failure stops the normal running of your job? What's your sink type? If it relies on the completed checkpoint to commit, it should be expected. On Tue, Oct 31, 2023 at 12:03 AM Evgeniy Lyutikov wrote: > Hi team! > I came across strange behavior in Flink 1.17.1. If

Re: Clear the State Backends in Flink

2023-11-05 Thread Hangxiang Yu
Hi, Arjun. Do you mean clearing all states stored in a user-defined state ? IIUC, It could be done for Operator state. But it cannot be done for Keyed state for users because every operation for it is binded with a specific key currently. BTW, Could you also share your business scenario ? It could

Re: Cannot find metata file metadats in directory

2023-09-30 Thread Hangxiang Yu
Hi, How did you point out the checkpoint path you restored from ? Seems that you are trying to restore from a not completed or failed checkpoint. On Thu, Sep 28, 2023 at 6:09 PM rui chen wrote: > When we use 1.13.2,we have the following error: > FileNotFoundException: Cannot find metata file me

Re: Flink Kafka offset commit issues

2023-09-30 Thread Hangxiang Yu
Hi, Elakiya. I think you could check : 1. The TaskManager Log to figure out whether the job is restoring from an existing checkpoint and the restoring checkpoint path. 2. Or you could check the checkpoint ID when you restart your job (If not restoring from a checkpoint, it starts from

Re: Re: Re: How to read flinkSQL job state

2023-09-24 Thread Hangxiang Yu
> Hi Hangxiang, > > I still have one question about this problem, when using datastream api I > know the key and value type I use in state because I > defined ValueStateDescriptor, but how can I get the ValueStateDescriptor in > flinksql? > > Thanks, > Yifan > > On

Re: Checkpoint jitter?

2023-09-12 Thread Hangxiang Yu
Hi, Matyas. Do you mean something like adjusting checkpoint intervals dynamically or frequency of uploading files according to the pressure of the durable storage ? On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás wrote: > Hey folks, > > Is it possible to add some sort of jitter to the checkpointin

Re: Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
Hi, Yifan. Which flink version are you using ? You are using filesystem instead of rocksdb so that your checkpoint size may not be incremental IIUC. On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user wrote: > Hi Shammon, > > We are using RocksDB,and the configuration is below: > execution.checkpo

Re: Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
tal size keeps > increasing. We didn't add any custom checkpoint configuration in flink sql > jobs, where can I see the log of > StreamGraphHasherV2.generateDeterministicHash? And is there a default state > name? > > Thanks, > Yifan > > On 2023/09/06 07:12:05 Hang

Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
Hi, Yifan. Unfortunately, The State Processor API only supports Datastream currently. But you still could use it to read your SQL job state. The most difficult thing is that you have to get the operator id which you could get from the log of StreamGraphHasherV2.generateDeterministicHash and state n

Re: Check points are discarded with reason NULL

2023-07-23 Thread Hangxiang Yu
Hi, This exception is thrown because the number of checkpoint exceptions exceeds execution.checkpointing.tolerable-failed-checkpoints, see [1] for more details. There should be other root causes about the checkpoint exception in your JM/TM logs. You could check or share these. [1] https://nightlie

Re: MAx parallelism

2023-07-11 Thread Hangxiang Yu
Hi, If you are using HashMapStateBackend, there may be some noticeable overhead. If RocksDBStateBackend, I think the overhead may be minor. As we know, Flink will write the key group as the prefix of the key to speed up rescaling. So the format will be like: key group | key len | key | .. You

Re: the new state serializer can not be incompatible

2023-07-11 Thread Hangxiang Yu
, Jul 10, 2023 at 12:23 PM 湘晗刚 <1016465...@qq.com> wrote: > Hi , > i am using pojo type , > flink version is 1.10, > but the pojo type is the same as before, > What do.you mean by saying “modify schema of kryo types”? > > ---Original--- > *From:* "Hangxiang Yu&quo

Re: the new state serializer can not be incompatible

2023-07-09 Thread Hangxiang Yu
Hi, Could you share the detailed exception stack ? Or Did you modify any job logic or parameters ? Currently, Flink only supports simple schema evolution (e.g. add or remove fields for pojo types) for DataStream Jobs[1]. Other modifications may cause this exception, for example: 1. modify some sche

Re: Disable hostname verification in Opensearch Connector

2023-06-24 Thread Hangxiang Yu
Hi, Eugenio. Maybe you could configure RestClientBuilder manually to setSSLHostnameVerifier ? On Fri, Jun 23, 2023 at 4:06 PM Eugenio Marotti < ing.eugenio.maro...@gmail.com> wrote: > Hi, > > I’m currently using the Opensearch Connector for the Table API. For > testing I need to disable the host

Re: table.exec.state.ttl not working as expected

2023-06-24 Thread Hangxiang Yu
Hi, neha. Could you share more information: 1. Which State Backend are you using? If it's RocksDB, is incremental checkpointing enabled? 2. Which specific operator is experiencing an increase in Checkpoint data size? (You can check the Checkpoint size changes of different subtasks

Re: Savepoint Failing - Flink 1.16.2 - Scala12

2023-06-24 Thread Hangxiang Yu
Hi, Shrihari. It seems related to https://issues.apache.org/jira/browse/FLINK-28758 which is unresolved now. It should only occur in FlinkKafkaConsumer, So you could migrate to KafkaSource to avoid this issue. On Sat, Jun 24, 2023 at 2:46 AM Shrihari R wrote: > I am trying to stop the job by t

Re: RocksDB State Backend GET returns null intermittently

2023-06-24 Thread Hangxiang Yu
Hi, Prabhu. This is a correctness issue. IIUC, It should not be related to the size of the block cache, write buffer, or whether the bloom filter is enabled. Is your job a DataStream job? Does the job contain a custom Serializer? You could check or share the logic of the Serializer, as this is on

Re: CleanUpInRocksDbCompactFilter

2023-06-15 Thread Hangxiang Yu
Hi, Patricia. In my opinion, This parameter balances the trade-off between the read/write performance and storage space utilization (of course, smaller state also means better performance for the future). I think the right value of longtimeNumberOfQueries depends on several factors, such as the si

Re: Why does ClosureCleaner ignore checkSerialization=false on recursion?

2023-06-11 Thread Hangxiang Yu
Hi, Logan. I guess your program used an inner process function which has closure whose checkSerialization is true (actually, this is a setting of most functions). When a closure object is created in Flink, it may hold references to other non-serializable objects, which may cause serialization error

Re: changing serializer affects resuming from checkpoint

2023-06-11 Thread Hangxiang Yu
ctKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) > > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) > > at > org.apache.flink.runtime.state.DefaultKeyedStateS

Re: changing serializer affects resuming from checkpoint

2023-06-06 Thread Hangxiang Yu
HI, Peng. Do these two jobs have any dependency? Or Could you please share the specific logic of the two jobs if convenient ? Could you also share the failure message of the producer job ? In my opinion, if the two tasks have no other association, as you said, the consumer job will fail due to unsu

Re: Network Buffers

2023-06-06 Thread Hangxiang Yu
> > I have a total of 6000 tasks with 16 TM , 4 cores each with > jobmanger/taskmanger.momry.process.size = 8 gb . > > > Thanks & Regards, > Pritam > > > > On Tue, Jun 6, 2023 at 9:02 AM Hangxiang Yu wrote: > >> Hi, Pritam. >> This error messag

Re: Network Buffers

2023-06-05 Thread Hangxiang Yu
Hi, Pritam. This error message indicates that the current configuration of the network buffer is not enough to handle the current workload. > What is the meaning of this exception (The total number of network buffers > is currently set to 22773 of 32768 bytes each)? > This just provides some infor

Re: High Start-Delay And Aligned Checkpointing Causing Timeout.

2023-06-04 Thread Hangxiang Yu
Hi, Pritam. I think the definition works for aligned checkpoint and unaligned checkpoint: "The alignment duration, which is defined as the time between receiving first and the last checkpoint barrier. " I

Re: RocksDB segfault on state restore

2023-06-01 Thread Hangxiang Yu
Hi, Gyula. It seems related to https://issues.apache.org/jira/browse/FLINK-23346. We also saw core dump while using list state after triggering state migration and ttl compaction filter. Have you triggered the schema evolution ? It seems a bug of the rocksdb list state together with ttl compaction

Re: Flink checkpoint timeout

2023-06-01 Thread Hangxiang Yu
HI, Ivan. Could you provide more information about it: 1. Which operator subtask is stuck ? or is it random ? 2. Could you share the stack or flame graph of the stuck subtask ? On Wed, May 31, 2023 at 12:45 PM Ethan T Yang wrote: > Hello all, > > We recently start to experience Checkpoint timeou

Re: (无主题)

2023-05-14 Thread Hangxiang Yu
Hi, It's related to FLINK-11695 which has not been resolved until now. You could increase the limit size of hdfs to alleviate this problem. BTW, You could also share or check something before modifying the configuration: >From the logic of your jo

Re: Flink Job Failure for version 1.16

2023-05-14 Thread Hangxiang Yu
Hi, I may have missed something, So could you share more: I have recently migrated from 1.13.6 to 1.16.1, I can see there is a > performance degradation... Are you referring to a decrease in Checkpoint Performance when you mention performance decline? It just happens when you upgrade from 1.13.

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Hangxiang Yu
Hi, I guess you used a fixed JOB_ID, and configured the same checkpoint dir as before ? And you may also start the job without before state ? The new job cannot know anything about before checkpoints, that's why the new job will fail when it tries to generate a new checkpoint. I'd like to suggest y

Re: is there any detrimental side-effect if i set the max parallelism as 32768

2023-03-07 Thread Hangxiang Yu
Hi, Tony. "be detrimental to performance" means that some extra space overhead of the field of the key-group may influence performance. As we know, Flink will write the key group as the prefix of the key to speed up rescaling. So the format will be like: key group | key len | key | .. You could

Re: Savepoint a failing job

2022-12-21 Thread Hangxiang Yu
Hi Tim. > Is the only solution to just use the DataStream API? Just as Martijn mentioned, if the execution plan has been changed, it's difficult to reuse the original state to restore. Only if you are dropping some operators, then you could use -- allowNonRestoredState to restore withouting droppin

Re: Parse checkpoint _metadata file

2022-12-21 Thread Hangxiang Yu
Hi, > Is there some way to deserialize the checkpoint _metadata file? You could use some methods like SavepointLoader#loadSavepointMetadata in the State processor api to load it. > If i try to process the file with regular expressions, then approximately 90% of S3 paths of objects are actually mis

Re: Rocksdb Incremental checkpoint

2022-12-19 Thread Hangxiang Yu
Hi, IIUC, numRetainedCheckpoints will only influence the space overhead of checkpoint dir, but not the incremental size. RocksDB executes incremental checkpoint based on the shard directory which will always remain SST Files as much as possible (maybe it's from the last checkpoint, or maybe from lo

Re: Reduce checkpoint-induced latency by tuning the amount of resource available for checkpoints

2022-12-19 Thread Hangxiang Yu
checkpointing duration? > > Thanks again, > Robin > > Le ven. 16 déc. 2022 à 13:48, Hangxiang Yu a écrit : > >> Hi, Robin. >> From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I >> guess your version of Flink was below 1.16 and you adapt

Re: Getting S3 client metrics from the flink-s3-fs-presto library

2022-12-18 Thread Hangxiang Yu
Hi, I think it's reasonable to support it. I am thinking to take it one step further: Client-side metrics, or job level metrics for filesystem could help us to monitor filesystem more precisely. Some metrics (like request rate , throughput, latency, retry count, etc) are useful to monitor the ne

Re: Reduce checkpoint-induced latency by tuning the amount of resource available for checkpoints

2022-12-16 Thread Hangxiang Yu
Hi, Robin. >From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I guess your version of Flink was below 1.16 and you adapted the default config of 'state.backend.incremental'. In the version below 1.16, RocksDBStateBackend will use savepoint format as its full snapshot[1]. So it will

Re: How to use the local repositories Jar instead of pulling remote snapshots when building modules?

2022-12-12 Thread Hangxiang Yu
Hi, hjw. I think [1] & [2] may help you. [1] https://stackoverflow.com/questions/16866978/maven-cant-find-my-local-artifacts [2] https://stackoverflow.com/questions/32571400/remote-repositories-prevents-maven-from-resolving-remote-parent On Fri, Dec 2, 2022 at 1:44 AM hjw wrote: > Hi, team. > M

Re: Clarification on checkpoint cleanup with RETAIN_ON_CANCELLATION

2022-12-12 Thread Hangxiang Yu
Hi Alexis. IIUC, by default, the job id of the new job should be different if you restore from a stopped job ? Whether to cleanup is related to the savepoint restore mode. Just in the case of failover, the job id should not change, and everything in the checkpoint dir will be claimed as you said.

Re: Exceeded Checkpoint tolerable failure

2022-12-11 Thread Hangxiang Yu
ttached images for reference. > > > > Regards, > Madan > > On Thursday, 8 December 2022 at 06:29:49 pm GMT-8, Hangxiang Yu < > master...@gmail.com> wrote: > > > Hi, Madan. > I think there is a root cause of the exception, could you share it ? > BTW, If you don&#

Re: Exceeded Checkpoint tolerable failure

2022-12-08 Thread Hangxiang Yu
Hi, Madan. I think there is a root cause of the exception, could you share it ? BTW, If you don't set a value for execution.checkpointing.tolerable-failed-checkpoints, I'd recommend you to set it which could avoid job restart due to some recoverable temporary problems. [1] https://nightlies.apache

Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Hangxiang Yu
Hi, Lars. Could you check whether you have configured the lifecycle of google cloud storage[1] which is not recommended in the flink checkpoint usage? [1] https://cloud.google.com/storage/docs/lifecycle On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven wrote: > Hello, > We had an incident today with

Re: Savepoint restore mode for the Kubernetes operator

2022-11-27 Thread Hangxiang Yu
Hi, Alexis. IIUC, There is no conflict between savepoint history and restore mode. Restore mode cares about whether/how we manage the savepoint of old job. Savepoint management in operator only cares about savepoint history of new job. In other words, savepoint cleanup should not clean the savepoin

Re: Safe way to clear old checkpoint data

2022-11-27 Thread Hangxiang Yu
Hi, As Martijn mentioned, snapshot ownership in 1.15 is the best way. You say there are just 24000/10 references in a shared directory in a job. Is your case in the scope of [1] ? If right, I think it works if you could check the _metadata and find some files not referenced. And I suggest you

Re: Flink falls back on to kryo serializer for GenericTypes

2022-10-11 Thread Hangxiang Yu
Hi Sucheth. It's related to how you defined your GenericTypes. You may still need to give some hints to flink if you are using complicated generic types so what you tried may not be enough. Could you share your generic type object ? BTW, Maybe you could refer to [1] which I think it's similar to yo

Re: Window state size with global window and custom trigger

2022-10-09 Thread Hangxiang Yu
Hi, Alexis. I think you are right. It also applies for a global window with a custom trigger. If you apply a ReduceFunction or AggregateFunction, the window state size usually is smaller than applying ProcessWindowFunction due to the aggregated value. It also works for global windows. Of course, th

Re: Difference between Checkpoint and Savepoint

2022-09-25 Thread Hangxiang Yu
Hi, > Regarding externalized checkpoint, is the checkpoint written to persistent storage only if the job is failed or suspended? What about cancelled or killed by the user? The checkpoint will be retained on cancellation and failure if you configure RETAIN_ON_CANCELLATION. > What information is wr

Re: How to perform something on Checkpointing and Savepointing

2022-09-12 Thread Hangxiang Yu
Hi, I think maybe you could try to create a Function that implements WithMasterCheckpointHook. These hooks will be called by the checkpoint coordinator when triggering / restoring a checkpoint. You could see more details from [1]. [1] https://github.com/apache/flink/blob/master/flink-streaming-java

Re: Flink upgrade path

2022-09-06 Thread Hangxiang Yu
Hi, Alexey. You could check the state compatibility in the compatibility table. The page includes how to upgrade and whether it is compatible among different versions. [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/upgrading/#compatibility-table On Wed, Sep 7, 2022 at 7:0

Re: Out of memory in heap memory when working with state

2022-09-05 Thread Hangxiang Yu
Hi, lan. I guess you are using the old version of flink. You could use RocksDBStateBackend[1] in the new version. It will put the state into disk when the state is large which could avoid using too much memory. BTW, In the current internal mechanism, the state on the external storage like s3 is jus

Re: Doubt about the RUNNING state of the job

2022-08-23 Thread Hangxiang Yu
Actually, Every operator/subtask will have its own status just behind the 'RUNNING' status on the flink UI. The status may be 'CREATED', 'DEPLOYING', 'INITING', 'RUNNING' and so on as you could see some status in [1]. Different operators/subtasks may have different status. After all operators/subta

Re: Pojo state schema evolution not working correctly

2022-08-07 Thread Hangxiang Yu
Hi, IIUC, Conditions to reproduce it are: 1. Using RocksDBStateBackend with incremental strategy 2. Using ListState in the stateful operator 3. enabling TTL with cleanupInRocksdbCompactFilter 4. adding a field to make the job trigger schema evolution Then the exception will be thrown, right? As fo

Re: Can FIFO compaction with RocksDB result in data loss?

2022-07-04 Thread Hangxiang Yu
Hi, Vishal. IIUC, 1. FIFO compaction drops the old data by the configured size in L0, so the old data may be dropped but we could not know. That's why "it's basically a TTL compaction style and It is suited for keeping event log data with very low overhead (query log for example)". If it's the user

Re: how to connect to the flink-state store and use it as cache to serve APIs.

2022-06-28 Thread Hangxiang Yu
Hi, laxmi. There are two ways that users can access the state store currently: 1. Queryable state [1] which you could access states in runtime. 2. State Processor API [2] which you could access states (snapshot) offline. But we have marked the Queryable state as "Reaching End-of-Life". We are also

Re: Spike in checkpoint start delay every 15 minutes

2022-06-16 Thread Hangxiang Yu
Is the 4th "checkpointed size" and "checkpoint duration" bigger than others? If it is true, I guess it's related to the flush of rocksdb. It may delay the next checkpoint. Best, Hangxiang. On Fri, Jun 17, 2022 at 2:31 PM Hangxiang Yu wrote: > Is the 4th "c

Re: Spike in checkpoint start delay every 15 minutes

2022-06-14 Thread Hangxiang Yu
Hi, Jai. Could you share your configuration about the checkpoint (interval, min-pause, and so on) and the checkpoint details in the Flink UI ? I guess the delay of the checkpoint may be related to the last checkpoint completion time as you could see in the CheckpointRequestDecider#chooseRequestToE

Re: TolerableCheckpointFailureNumber not always applying

2022-05-23 Thread Hangxiang Yu
; W - www.datadome.co > > > > On Mon, May 23, 2022 at 4:35 PM Hangxiang Yu wrote: > >> Hi, Gaël Renoux. >> As you could see in [1], There are some descriptions about the config: >> "This only applies to the following failure reasons: IOException on the >

Re: TolerableCheckpointFailureNumber not always applying

2022-05-23 Thread Hangxiang Yu
Hi, Gaël Renoux. As you could see in [1], There are some descriptions about the config: "This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the syn

Re: Schema Evolution of POJOs fails on Field Removal

2022-05-18 Thread Hangxiang Yu
Hi, David. Removing a field from a POJO should work as you said. But I think we need more information. What version of flink are you using? Do you have any other modifications? Could you share your code segments and the error jm log if convenient ? On Wed, May 18, 2022 at 9:07 PM David Jost wrote

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Hangxiang Yu
Hi, James. I may not get what the problem is. All checkpoints will store in the address as you set. IIUC, TMs will write some checkpoint info in their local dir and then upload them to the address and then delete local one. JM will write some metas of checkpoint to the address and also do the entir

Re: Random incorrect checkpoint existence check

2022-04-25 Thread Hangxiang Yu
Hi, Chen-che, I think it may be similar to FLINK-12381 . You could adopt the suggestion like setting the job-id as you could see the comment below the ticket. I think you could also share your environment in this ticket to let us know more informat

Re: Savepoint and cancel questions

2022-04-22 Thread Hangxiang Yu
Hi, Dan 1. Do you mean put the option into savepoint command? If so, I think it will not work well. This option describe that how checkpoints will be cleaned up in different job status. e.g. FAILED/CANCELED. It cannot be covered in savepoint command. 2. Which flink version you use? I work on 1.14 a