Re: Why is task manager shutting down?

2022-09-30 Thread Congxian Qiu
hu., Sep. 29, 2022, 7:02 a.m. John Smith, > wrote: > >> Is there a way to increase the 30 seconds to 60? Where is that 30 second >> timeout set? >> >> I have jdbc query timeout but at some point at night the insert takes a >> bit longer cause of index rebuilding.

Re: Why is task manager shutting down?

2022-09-28 Thread Congxian Qiu
Hi John Yes, the whole TaskManager exited because the task did not react to cancelling signal in time ``` 2022-08-30 09:14:22,138 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds. org.apache.flink.util.FlinkRuntimeException

Re: Flink upgrade path

2022-09-07 Thread Congxian Qiu
In addition to the state compatibility mentioned above, the interfaces provided by Flink are stable if they have public annotation[1] [1] https://github.com/apache/flink/blob/master/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java Best, Congxian Hangxiang Yu 于2022年9月7日周三

Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Congxian Qiu
Hi From the description, the time used to complete the checkpoint in 1.12 is longer. could you share more detail about the time consumption when running job on 1.9 and 1.12? Best, Congxian Haihang Jing 于2021年3月23日周二 下午7:22写道: > 【Appearance】For jobs with the same configuration (checkpoint in

Re: Read the metadata files (got from savepoints)

2021-03-21 Thread Congxian Qiu
Hi Maybe you can reach to this test[1] for reference [1] https://github.com/apache/flink/blob/a33e6bd390a9935c3e25b6913bed0ff6b4a78818/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java#L55 Best, Congxian Abdullah bin Omar 于2021年3月22日周一 上午11:25

Re: flink checkpoints adjustment strategy

2021-01-29 Thread Congxian Qiu
Hi Marco You need to figure out why the checkpoint timed out(you can see the consumed time of each period for one checkpoint in UI), if it indeed needs such long time to complete the checkpoint, then you need to configure a longer timeout. If there are some checkpoint errors, we need firs

Re: Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Congxian Qiu
Hi Bajaj Savepoint does contain the metadata and data in Flink 1.10, it does not need to reference any checkpoint data. Best, Congxian Bajaj, Abhinav 于2020年11月17日周二 上午8:58写道: > Hi, > > > > I am trying to understand the Flink 1.10 savepoints related documentation >

Re: checkpoint interval and hdfs file capacity

2020-11-11 Thread Congxian Qiu
elete old files? > > Congxian Qiu 于2020年11月10日周二 下午2:16写道: > >> Hi >> No matter what interval you set, Flink will take care of the >> checkpoints(remove the useless checkpoint when it can), but when you set a >> very small checkpoint interval, there may be much

Re: checkpoint interval and hdfs file capacity

2020-11-09 Thread Congxian Qiu
Hi No matter what interval you set, Flink will take care of the checkpoints(remove the useless checkpoint when it can), but when you set a very small checkpoint interval, there may be much high pressure for the storage system(here is RPC pressure of HDFS NN). Best, Congxian lec ssmi 于2020年1

Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-29 Thread Congxian Qiu
Hi Partha The exception here said that there is some operator in the checkpoint/savepoint, but not in the new program. As you said that, both times use the same user program binary, this seems a little strange to me. did you ever set the uid for all the operators? Best, Congxian Parth

Re: Feature request: Removing state from operators

2020-10-29 Thread Congxian Qiu
Hi Peter Can applyToAllKeys[1] in KeyedStateBackend help you here? but currently, this is not exposed to users now. [1] https://github.com/apache/flink/blob/fada6fb6ac9fd7f6510f1f2d77b6baa06563e222/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L65 Best, Co

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Congxian Qiu
Hi As others said, state is different as checkpoint. a checkpoint is just a **snapshot** of the state, and you can restore from the previous checkpoint if the job crashed. state is for stateful computation, and checkpoint is for fault-tolerant[1] The state keeps the information you'l

Re: restoring from externalized incremental rocksdb checkpoint?

2020-10-12 Thread Congxian Qiu
xpect a metadata file when using > incremental checkpoints? > > On Mon, Sep 14, 2020 at 10:46 PM Congxian Qiu > wrote: > >> Hi Jeff >>You can restore from retained checkpoint such as[1] `bin/flink run -s >> :checkpointMetaDataPath [:runArgs]` , you may find the m

Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-26 Thread Congxian Qiu
Hi Eleanore What the `CheckpointRetentionPolicy`[1] did you set for your job? if `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the checkpoint will be kept when canceling a job. PS the image did not show [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/stat

Re: flink checkpoint timeout

2020-09-14 Thread Congxian Qiu
Hi You can try to find out is there is some hot method, or the snapshot stack is waiting for some lock. and maybe Best, Congxian Deshpande, Omkar 于2020年9月15日周二 下午12:30写道: > Few of the subtasks fail. I cannot upgrade to 1.11 yet. But I can still > get the thread dump. What should I be lookin

Re: restoring from externalized incremental rocksdb checkpoint?

2020-09-14 Thread Congxian Qiu
Hi Jeff You can restore from retained checkpoint such as[1] `bin/flink run -s :checkpointMetaDataPath [:runArgs]` , you may find the metadata in the `chk-xxx` directory[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#resuming-from-a-retained-check

Re: OOM error for heap state backend.

2020-08-27 Thread Congxian Qiu
Hi The stack said that the job failed when restoring from checkpoint/savepoint. If encounter this when in failover, maybe you can try to find out the root cause which caused the job failover. For the stack, it because when restoring `HeapPriorityQueue`, there would ensure there are enough siz

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Congxian Qiu
Congratulations Dian Best, Congxian Xintong Song 于2020年8月27日周四 下午7:50写道: > Congratulations Dian~! > > Thank you~ > > Xintong Song > > > > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu wrote: > > > Congratulations Dian! > > > > Best, > > Jark > > > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu wrote: >

Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-26 Thread Congxian Qiu
Thanks ZhuZhu for managing this release and everyone else who contributed to this release! Best, Congxian Xingbo Huang 于2020年8月26日周三 下午1:53写道: > Thanks Zhu for the great work and everyone who contributed to this release! > > Best, > Xingbo > > Guowei Ma 于2020年8月26日周三 下午12:43写道: > >> Hi, >> >>

Re: allowNonRestoredState: metadata file in checkpoint dir missing

2020-08-01 Thread Congxian Qiu
Hi Omkar `--allowNonRestoredState` would not affect the behavior of checkpoint, it only affects the restore logic. As the problem of not generate _metadata file, could you please check 1) if the job enabled checkpoint; 2) if there is any checkpoint complete successfully. Best, Congxian Des

Re: Unable to recover from checkpoint

2020-07-30 Thread Congxian Qiu
Hi Sivaprasanna For RocksDBStateBackend incremental checkpoint, the latest checkpoint may contain the files of the previous checkpoint(the files in the shared directory), so delete the files belong to the previous checkpoint may lead to FileNotFoundException. Currently, we can only parse the

Re: unsubscribe

2020-07-29 Thread Congxian Qiu
Hi Please send an email to user-unsubscr...@flink.apache.org for unsubscribing. you can get more info here[1] [1] https://flink.apache.org/community.html#mailing-lists Best, Congxian Maatary Okouya 于2020年7月30日周四 上午12:09写道: > >

Re: Flink 1.11 job stop with save point timeout error

2020-07-25 Thread Congxian Qiu
Hi Ivan From the JM log, the savepoint complete with 1 second, and the timeout exception said that the stop-with-savepoint can not be completed in 60s(this was calculated by 20 -- RestOptions#RETRAY_MAX_ATTEMPTS * 3s -- RestOptions#RETRY_DELAY. you can check the logic here[1]). I'm not sure what

Re: Is it possible to do state migration with checkpoints?

2020-07-23 Thread Congxian Qiu
Hi Sivaprasanna I think state schema evolution can work for incremental checkpoint. And I tried with a simple Pojo schema, It also works. maybe you need to check the schema, from the exception stack, the schema before and after are incompatible. Best, Congxian Sivaprasanna 于2020年7月24日周五 上午12

Re: Flink failed to resume from checkpoint stored on S3

2020-07-22 Thread Congxian Qiu
Hi Xiaolong From the log, seems there is no `_metadata` file in the checkpoint directory s3:///flink/checkpoint_dir/65786c3307a10e79a52b4de478cfe996/chk-7853. Do you configurate the retain checkpoint configuration[1] ever? If we do not configuration it, the checkpoint will be deleted if job

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Congxian Qiu
Thanks Dian for the great work and thanks to everyone who makes this release possible! Best, Congxian Rui Li 于2020年7月23日周四 上午10:48写道: > Thanks Dian for the great work! > > On Thu, Jul 23, 2020 at 10:22 AM Jingsong Li > wrote: > > > Thanks for being the release manager for the 1.11.1 release,

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
, in this case, was writing one line into the logger (I was writing 8 > GB in total), and that makes more sense. So nothing wrong with the > Flink/Savepoint behaviour. > > Thanks, > David > > On Tue, Jul 21, 2020 at 12:37 PM Congxian Qiu > wrote: > >> Hi David

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
>>> use did it. >>> >>> For my scenario, I'm using the Flink REST API to start/deploy jobs. A >>> retained checkpoint isn't supported in REST API and even if it was, I think >>> it doesn't fit my scenario (stop a job, and start the new one fro

Re: How to get flink JobId in runtime

2020-07-20 Thread Congxian Qiu
Hi Sili I'm not sure if there are other ways to get this value properly. Maybe you can try `RuntimeContext.getMetricGroup().getAllVariables().get("")`. Best, Congxian Si-li Liu 于2020年7月20日周一 下午7:38写道: > Hi > > I want to retrieve flink JobId in runtime, for example, during > RichFunction's

Re: How to debug window states

2020-07-19 Thread Congxian Qiu
Hi There is an issue[1] wants to provide an easy way to read/bootstrap window state using the state processor API, but the PR was closed currently. [1] https://issues.apache.org/jira/browse/FLINK-13095 Best, Congxian Paul Lam 于2020年7月15日周三 下午4:21写道: > It turns out to be a bug of StateTTL [1]

Re: Timeout when using RockDB to handle large state in a stream app

2020-07-11 Thread Congxian Qiu
Hi Felipe I'm just wandering does increase the heartbeat.timeout with RocksDBStateBackend works for you. If not, what does the GC log say? thanks. Best, Congxian Felipe Gutierrez 于2020年7月7日周二 下午10:02写道: > I figured out that for my stream job the best was just to use the > default MemoryStateB

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-11 Thread Congxian Qiu
Hi Ori AFAIK, current the 2GB limit is still there. as a workaround, maybe you can reduce the state size. If this can not be done using the window operator, can the keyedprocessfunction[1] be ok for you? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-11 Thread Congxian Qiu
; Sorry > > I can't reproduce it with reduce/aggregate/fold/apply and due to some > limitations in my working environment, I can't use flink 1.10 or 1.11. > > Congxian Qiu 于2020年7月5日周日 下午6:21写道: > >> Hi >> >> First, Could you please try this problem still

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-11 Thread Congxian Qiu
Hi David As you say the savepoint use local disk, I assume that you use RocksDBStateBackend. What's the flink version are you using now? What do you mean "The task manager did not clean up the state"?, does that mean the local disk space did not clean up, do the task encounter failover in this p

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread Congxian Qiu
Thanks Zhijiang and Piotr for the great work as release manager, and thanks everyone who makes the release possible! Best, Congxian Benchao Li 于2020年7月8日周三 下午12:39写道: > Congratulations! Thanks Zhijiang & Piotr for the great work as release > managers. > > Rui Li 于2020年7月8日周三 上午11:38写道: > >>

Re: Does savepoint reset the base for incremental checkpoint

2020-07-06 Thread Congxian Qiu
Hi checkpoint base is only used in the incremental checkpoint, the answer for the first question is checkpoint x. After restoring from a savepoint, there is no base for the first checkpoint. you can ref to the code[1][2] for more information. [1] https://github.com/apache/flink/blob/c14f9d2f9f6

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-05 Thread Congxian Qiu
Hi SmileSmile As the OOM problem, maybe you can try to get a memory dump before OOM, after you get the memory dump, you can know who consumes more memory as expected. Best, Congxian Yun Tang 于2020年7月3日周五 下午3:04写道: > Hi > > If you do not enable checkpoint and have you ever restored checkpoint

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-05 Thread Congxian Qiu
Hi First, Could you please try this problem still there if use flink 1.10 or 1.11? It seems strange, from the error message, here is an error when trying to convert a non-Window state(VoidNameSpace) to a Window State (serializer is the serializer of Window state, but the state is non-Window state

Re: MapState bad performance

2020-06-20 Thread Congxian Qiu
Hi Nick Sorry for the late jump in. Just wondering why you call putAll of RocksDBMapState and has RocksDBMapState#clear() followed. seems the state will always be empty after the process. Best, Congxian Yun Tang 于2020年6月16日周二 下午7:42写道: > Hi Nick > > From my experience, it's not easy to tune

Re: Trouble with large state

2020-06-20 Thread Congxian Qiu
Hi Sorry to jump in late. After read the previous email. I have such assumptions, and please correct me if I'm wrong: - RocksDBStateBackend with incremental checkpoint - at least once mode - the parallelism for the stateful operator is 8 - checkpoint may take too long to complete - has fix rate in

Re: Shared state between two process functions

2020-06-17 Thread Congxian Qiu
Hi Maybe you can take a look at broadcast state[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html Best, Congxian Robert Metzger 于2020年6月16日周二 上午2:18写道: > Thanks for sharing some details on the use case: Are you able to move the > common

Re: Improved performance when using incremental checkpoints

2020-06-17 Thread Congxian Qiu
Hi Nick The result is a bit wired. Did you compare the disk util/performance before and after enabling checkpoint? Best, Congxian Yun Tang 于2020年6月17日周三 下午8:56写道: > Hi Nick > > I think this thread use the same program as thread "MapState bad > performance" talked. > Please provide a simple pr

Re: Is State TTL possible with event-time characteristics ?

2020-06-17 Thread Congxian Qiu
Hi Currently, Flink does not support event-time TTL state, there is an issue[1] tracking this. [1] https://issues.apache.org/jira/browse/FLINK-12005 Best, Congxian Arti Pande 于2020年6月17日周三 下午7:37写道: > With Flink 1.9 is state TTL supported for event-time characteristics? This > part >

Re: Incremental state

2020-06-14 Thread Congxian Qiu
Hi Can process function[1] can meet your needs here?, you can do the TTL logic using timers in process functions. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html Best, Congxian Timo Walther 于2020年6月10日周三 下午9:36写道: > Hi Annemarie, > >

Re: MapState in flink

2020-06-14 Thread Congxian Qiu
Hi Could you please share why you need `MapState` instead of `MapState>` Best, Congxian Oytun Tez 于2020年6月14日周日 上午3:39写道: > Correct me @everyone if I'm wrong, but you cannot keep State inside State > in that way. So change your signature to: MapState> > > The underlying mechanism wouldn't mak

Re: [ANNOUNCE] Apache Flink Stateful Functions 2.1.0 released

2020-06-09 Thread Congxian Qiu
@Gordon thanks a lot for the release and for being the release manager. Also thanks to everyone who made this release possible! Best, Congxian Oytun Tez 于2020年6月9日周二 下午7:08写道: > Thank you, Gordon and everyone. > > On Tue, Jun 9, 2020 at 5:56 AM Tzu-Li (Gordon) Tai > wrote: > >> The Apache Fli

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Congxian Qiu
nally >> configured to that and the checkpoints were taking too long to complete >> before the next checkpoint interval began. >> >> >> >> Our tm’s are normally 3 slots (3_slots.png), we wanted to try running >> with 1 slot (1_slot.png) and noticed the checkpoi

Re: Custom trigger to trigger for late events

2020-05-29 Thread Congxian Qiu
Hi Poornapragna I'll try to answer your questions 1. you don't need to delete the timer manually(it will be deleted after fired), but you can delete the timers manually if you want. 2. AFAIK, triggers would not be snapshot out, but the timers will be snapshot out 3. delete timer that was not regi

Re: Auto adjusting watermarks?

2020-05-29 Thread Congxian Qiu
Hi Could it be store a histogram data in custom `BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` according to the histogram data ok for you case? (be careful, such histogram data would not snapshot out when checkpointing) Best, Congxian Theo Diefenthal 于2020年5月30日周六

Re: Inconsistent checkpoint durations vs state size

2020-05-29 Thread Congxian Qiu
Hi >From the given picture, 1. there were some checkpoint failed(but not because of timeout), could you please check why these checkpoint would fail? 2. The checkpoint data size is the delta size for current checkpoint[1], assume you using incremental checkpoint 3. In fig1 the checkpoint size is ~3

Re: Modified & rebuilt Flink source code but changes do not work

2020-05-26 Thread Congxian Qiu
Hi If you commit the change in you local git repo, could you please check whether the commitid in job log(such as `Rev:28bdd33`, the 28bdd33 is the commit id) is the same as the local commit id? Best, Congxian Qi K. 于2020年5月26日周二 下午4:47写道: > Hi folks, > > Within our team, we made some simple

Re: Protection against huge values in RocksDB List State

2020-05-19 Thread Congxian Qiu
hould be too big for the > RocksDB JNI bridge. This seems to make our job behave better! Thanks for > your help guys, this was really helpful :) > > Robin > > Le sam. 16 mai 2020 à 09:05, Congxian Qiu a > écrit : > >> Hi >> >> As you described, I&#x

Re: Rocksdb implementation

2020-05-19 Thread Congxian Qiu
Hi Flink will store state in StateBackend, there exist two StateBackends: HeapStateBackend - which will store state in heap, and RocksDBStateBackend -- which will store state in RocksDB. You can set RocksDB with the following ways:[1] 1. add `env.setStateBackend(...);` in your code 2. add configu

Re: Process available data and stop with savepoint

2020-05-17 Thread Congxian Qiu
Hi Sergii If I understand correctly, you want to process all the files in some directory, and do not want to process them multiple times. I'm not sure if using `FileProcessingMode#PROCESS_CONTINUOUSLY` instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, and keep the job running

Re: the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-17 Thread Congxian Qiu
we open source blink. As you can see, we > use "org.apache.flink.runtime.state.KeyGroupsStateSnapshot" instead of " > org.apache.flink.runtime.state.KeyGroupsStateHandle", and thus the > savepoint generated by Blink cannot be easily consumed by Flink. > > Best >

Re: Protection against huge values in RocksDB List State

2020-05-16 Thread Congxian Qiu
Hi As you described, I'm not sure whether MapState can help you in such case. MapState will serializer each separately, so it would not encounter such the problem as ListState. When using MapState, you may need to handle how to set the mapKey, if the whole state will be cleared after processed,

Re: Flink Key based watermarks with event time

2020-05-15 Thread Congxian Qiu
Hi Maybe you can try KeyedProcessFunction[1] for this, but you need to handle the allow-latency logic[2] in your own business logic(event-time records maybe out-of-order) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#the-keyedprocessfu

Re: the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-15 Thread Congxian Qiu
Hi, Could you please share the stack or the log message? If I understand correctly, savepoint V3 is not contained in 1.10, Best, Congxian Roc Marshal 于2020年5月15日周五 下午4:33写道: > Hi, all. > > When using savepoint to upgrade a Flink job from blink-1.5 to flink-1.10, > the system prompts that blink

Re: Incremental state with purging

2020-05-15 Thread Congxian Qiu
Hi >From your description, you want to do two things: 1 update state and remote the state older than x 2 output the state every y second >From my side, the first can be done by using TTL state as Yun said, the second can be done by using KeyedProcessFunction[1] If you want to have complex logic

Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-15 Thread Congxian Qiu
Thanks a lot for the release and your great job, Yu! Also thanks to everyone who made this release possible! Best, Congxian Yu Li 于2020年5月14日周四 上午1:59写道: > The Apache Flink community is very happy to announce the release of Apache > Flink 1.10.1, which is the first bugfix release for the Apach

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-08 Thread Congxian Qiu
find some solution for ways to clean up these directories. > > I appreciate your patience and help, thank you so much! > > Trystan > > On Thu, May 7, 2020 at 7:15 PM Congxian Qiu > wrote: > >> Hi >> >> Yes, there should only files used in checkpoint 8 an

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Congxian Qiu
#x27;t control anything beyond the checkpoint directory, and since > shared is in that directory I can't put entropy inside the shared directory > itself (which is what I would need). > > Thanks, > Trystan > > On Wed, May 6, 2020 at 7:31 PM Congxian Qiu > wrote: >

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-06 Thread Congxian Qiu
Hi For the rate limit, could you please try entropy injection[1]. For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid. [1] https://ci.apache.org

Re: checkpointing opening too many file

2020-05-06 Thread Congxian Qiu
tate size. > > thanks > On 4/25/2020 13:48,Congxian Qiu > wrote: > > Hi > If there are indeed so many files need to upload to hdfs, then currently > we do not have any solutions to limit the open files, there exist an > issue[1] wants to fix this problem, and a pr for it, maybe

Re: Flink: For terabytes of keyed state.

2020-05-06 Thread Congxian Qiu
, May 6, 2020 at 8:34 AM Congxian Qiu > wrote: > >> Hi >> >> From my experience, you should care the state size for a single task(not >> the whole job state size), the download speed for single thread is almost >> 100 MB/s (this may vary in different env), and I do

Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Congxian Qiu
to redistribute > terabytes of state data on node addition / node failure. > > Thanks! > > On Sun, May 3, 2020 at 6:56 PM Congxian Qiu > wrote: > >> Hi >> >> 1. From my experience, Flink can support such big state, you can set >> appropriate parallelism fo

Re: Savepoint memory overhead

2020-05-03 Thread Congxian Qiu
Hi >From the given fig, seems that the end-to-end duration of the two failed checkpoint is small(it is not timeout due to some reason), could you please check why did they fail? Maybe you can find something in jm log such as "Decline checkpoint {} by task {} of job {} at {}." then you can go to th

Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Congxian Qiu
Hi 1. From my experience, Flink can support such big state, you can set appropriate parallelism for the stateful operator. for RocksDB you may need to care about the disk performance. 2. Inside Flink, the state is separated by key-group, each task/parallelism contains multiple key-groups. Flink d

Re: Flink 1.9.2 why always checkpoint expired

2020-04-27 Thread Congxian Qiu
Hi The image is not very clear. For RocksDBStateBackend, do you enable incremental checkpoint? Currently, checkpoint on TM side contains some steps: 1 barrier align 2 sync snapshot 3 async snapshot For expired checkpoint, could you please check the tasks in the first operator of the DAG to find o

Re: Debug Slowness in Async Checkpointing

2020-04-24 Thread Congxian Qiu
Hi If the bottleneck is the upload part, did you even have tried upload files using multithread[1] [1] https://issues.apache.org/jira/browse/FLINK-11008 Best, Congxian Lu Niu 于2020年4月24日周五 下午12:38写道: > Hi, Robert > > Thanks for relying. Yeah. After I added monitoring on the above path, it > sh

Re: checkpointing opening too many file

2020-04-24 Thread Congxian Qiu
Hi If there are indeed so many files need to upload to hdfs, then currently we do not have any solutions to limit the open files, there exist an issue[1] wants to fix this problem, and a pr for it, maybe you can try the attached pr to try it can solve your problem. [1] https://issues.apache.org/ji

Re: Storing Operator state in RocksDb during runtime - plans

2020-04-10 Thread Congxian Qiu
Hi KristoffSC I'm not aware of any concrete plans for such a feature. I also CCed Yu, he may give more information about this. Best, Congxian Fabian Hueske 于2020年4月7日周二 上午4:27写道: > Hi Kristoff, > > I'm not aware of any concrete plans for such a feature. > > Best, > Fabian > > Am So., 5. Apr.

Re: Making job fail on Checkpoint Expired?

2020-04-10 Thread Congxian Qiu
dering if there is a way for long checkpoints to create > backpressure on the rest of the stream? This would be a nice feature to > have, since it would avoid the state growing too much when checkpointing > takes time because of temporary network issues for example. > > Thanks for

Re: FlinkRuntimeException: Unexpected list element deserialization failure

2020-04-10 Thread Congxian Qiu
Hi As Yun said, could you please share the whole stack trace(normally, you can copy more lines below the given stack trace), can not tell why this happened from the given stack trace. Best, Congxian Yun Tang 于2020年4月10日周五 上午1:48写道: > Hi > > I think you have missed the "caused by" exception [1

Re: State size Vs keys number perfromance

2020-04-08 Thread Congxian Qiu
nd up with about the same state size on each subtask, then there is no difference at this point Best, Congxian KristoffSC 于2020年4月8日周三 下午3:36写道: > Thanks Congxian Qiu, > I'm aware about your second point. In Value state I will keep String or > very > simple POJO, without

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-08 Thread Congxian Qiu
ose references. However, as I > mentioned in original post, there is enough capacity in all disk. Also, > when I switch to presto file system, the problem goes away. Wondering > whether others encounter similar issue. > > Best > Lu > > On Tue, Apr 7, 2020 at 7:03 PM Congxian Q

Re: Making job fail on Checkpoint Expired?

2020-04-07 Thread Congxian Qiu
> > I'd love to get your opinion on this! > > Thanks, > Robin > > Le ven. 3 avr. 2020 à 11:17, Congxian Qiu a > écrit : > >> Currently, only checkpoint declined will be counted into >> `continuousFailureCounter`. >> Could you please share why do you

Re: State size Vs keys number perfromance

2020-04-07 Thread Congxian Qiu
Hi I'll give some information from my side: 1. The performance for RocksDB is mainly related to the (de)serialization and disk read/write. 2. MapState just need to (de)serialize the single mapkey/mapvalue when read/write state, ValueState need to (de)serialize the whole state when read/write the st

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Congxian Qiu
Hi >From the stack, seems the problem is that "org.apache.flink.fs.shaded. hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-", and I googled the exception, found there is some relative page[1], could you please make sure there

Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Congxian Qiu
Thanks a lot for the release and your great job, Gordon! Also thanks to everyone who made this release possible! Best, Congxian Oytun Tez 于2020年4月8日周三 上午2:55写道: > I should also add, I couldn't agree more with this sentence in the release > article: "state access/updates and messaging need to b

Re: Questions regarding Key Managed state

2020-04-03 Thread Congxian Qiu
Hi Many keys can be in a single state(each state can have multiple key-group, and keys will be assigned to the right key-group) If you write a custom process function that uses a state you created, then there is only one user state in that instance(do not count the underlying state of Flink if th

Re: Making job fail on Checkpoint Expired?

2020-04-03 Thread Congxian Qiu
Currently, only checkpoint declined will be counted into `continuousFailureCounter`. Could you please share why do you want the job to fail when checkpoint expired? Best, Congxian Timo Walther 于2020年4月2日周四 下午11:23写道: > Hi Robin, > > this is a very good observation and maybe even unintended beh

Re: How to debug checkpoints failing to complete

2020-03-27 Thread Congxian Qiu
Hi >From my experience, you can first check the jobmanager.log, find out whether the checkpoint expired or was declined by some task, if expired, you can follow the adivce of seeksst given above(maybe enable debug log can help you here), if was declined, then you can go to the taskmanager.log to f

Re: End to End Latency Tracking in flink

2020-03-27 Thread Congxian Qiu
Hi As far as I know, the latency-tracking feature is for debugging usages, you can use it to debug, and disable it when running the job on production. >From my side, use $current_processing - $event_time is something ok, but keep the things in mind: the event time may not be the time ingested in Fl

Re: Very large _metadata file

2020-03-09 Thread Congxian Qiu
Hi As Gordon said, the metadata will contain the ByteStreamStateHandle, when writing out the ByteStreamStateHandle, will write out the handle name -- which is a path(as you saw). The ByteStreamStateHandle will be created when state size is small than `state.backend.fs.memory-threshold`(default is

Re: How to print the aggregated state everytime it is updated?

2020-03-06 Thread Congxian Qiu
Hi >From the description, you use window operator, and set to event time. then you should call `DataStream.assignTimestampsAndWatermarks` to set the timestamp and watermark. Window is triggered when the watermark exceed the window end time Best, Congxian kant kodali 于2020年3月4日周三 上午5:11写道: > H

Re: checkpoint _metadata file has >20x different in size among different check-points

2020-03-05 Thread Congxian Qiu
Hi Maybe there contains some ByteStreamStateHandle in the checkpoint, if you want to verify this, maybe you can configure `state.backend.fs.memory-threshold` to verify it. Please be careful to set this config, because it may produce many files with small size. Best, Congxian Arvid Heise 于2020年

Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

2020-02-24 Thread Congxian Qiu
+1 for dropping savepoint compatibility with Flink 1.2 Best, Congxian Dawid Wysakowicz 于2020年2月24日周一 下午4:00写道: > +1 for dropping > > Best, > > Dawid > On 24/02/2020 08:22, Yu Li wrote: > > +1 for dropping savepoint compatibility with Flink 1.2. > > Best Regards, > Yu > > > On Sat, 22 Feb 2020

Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-23 Thread Congxian Qiu
Congratulations Jingsong! Best, Congxian jincheng sun 于2020年2月24日周一 下午1:38写道: > Congratulations Jingsong! > > Best, > Jincheng > > > Zhu Zhu 于2020年2月24日周一 上午11:55写道: > >> Congratulations Jingsong! >> >> Thanks, >> Zhu Zhu >> >> Fabian Hueske 于2020年2月22日周六 上午1:30写道: >> >>> Congrats Jingsong!

Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Congxian Qiu
Great work. Thanks everyone involved. Thanks Gary and Yu for being the release manager Best, Congxian Jark Wu 于2020年2月12日周三 下午9:46写道: > Congratulations to everyone involved! > Great thanks to Yu & Gary for being the release manager! > > Best, > Jark > > On Wed, 12 Feb 2020 at 21:42, Zhu Zhu

Re: rocksdb max open file descriptor issue crashed application

2020-02-11 Thread Congxian Qiu
Hi >From the given description, you use RocksDBStateBackend, and will always open 20k files in one machine, and app suddenly opened 35K files than crashed. Could you please share what are the opened files? and what the exception (given the full taskmanager.log maybe helpful) Best, Congxian Apo

Re: savepoint failed for finished tasks

2020-01-17 Thread Congxian Qiu
Hi Currently, Checkpoint/savepoint only works if all operators/tasks are still running., there is an issue[1] tracking this [1] https://issues.apache.org/jira/browse/FLINK-2491 Best, Congxian Fanbin Bu 于2020年1月17日周五 上午6:49写道: > Hi, > > I couldn't make a savepoint for the following graph: > [

Re: [DISCUSS] Change default for RocksDB timers: Java Heap => in RocksDB

2020-01-17 Thread Congxian Qiu
+1 to store timers in RocksDB default. Store timers in Heap can encounter OOM problems, and make the checkpoint much slower, and store times in RocksDB can get ride of both. Best, Congxian Biao Liu 于2020年1月17日周五 下午3:10写道: > +1 > > I think that's how it should be. Timer should align with other

Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Congxian Qiu
Congratulations Dian Fu Best, Congxian Jark Wu 于2020年1月16日周四 下午7:44写道: > Congratulations Dian and welcome on board! > > Best, > Jark > > On Thu, 16 Jan 2020 at 19:32, Jingsong Li wrote: > > > Congratulations Dian Fu. Well deserved! > > > > Best, > > Jingsong Lee > > > > On Thu, Jan 16, 2020 a

Re: Frequently checkpoint failure, could make the flink sql state not clear?

2020-01-16 Thread Congxian Qiu
Hi, AFAIK, whether a timer will fire is irrelevant to checkpoint success or not. Best, Congxian LakeShen 于2020年1月16日周四 下午8:53写道: > Hi community, now I am using Flink sql , and I set the retention time, As > I all know is that Flink will set the timer for per key to clear their > state, if Fli

Re: Please suggest helpful tools

2020-01-10 Thread Congxian Qiu
Hi For expired checkpoint, you can find something like " Checkpoint xxx of job xx expired before completing" in jobmanager.log, then you can go to the checkpoint UI to find which tasks did not ack, and go to these tasks to see what happened. If checkpoint was been declined, you can find something

Re: How can I find out which key group belongs to which subtask

2020-01-09 Thread Congxian Qiu
Hi If you just want to make sure some key goes into the same subtask, does custom key selector[1] help? For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3] [1] https://ci.apache.org/projects/flink/

Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread Congxian Qiu
If you want to figure out the performance problem, maybe async-profile[1] can be helpful [1] https://github.com/jvm-profiling-tools/async-profiler Best, Congxian William C 于2020年1月8日周三 上午11:37写道: > Hallo > > on 2020/1/8 11:31, RKandoji wrote: > > I'm running my job on a EC2 instance with 32 cor

Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread Congxian Qiu
Hi RocksDB supports Incremental and full snapshot, both are async, do you want to verify whether it's incremental or full snapshot? but I don't know an easy to get this information currently Best, Congxian Zhijiang 于2020年1月8日周三 上午10:56写道: > The log way is simple for tracing and you can also g

Re: Checkpoints issue and job failing

2020-01-03 Thread Congxian Qiu
Hi Do you have ever check that this problem exists on Flink 1.9? Best, Congxian vino yang 于2020年1月3日周五 下午3:54写道: > Hi Navneeth, > > Did you check if the path contains in the exception is really can not be > found? > > Best, > Vino > > Navneeth Krishnan 于2020年1月3日周五 上午8:23写道: > >> Hi All, >>

  1   2   3   >