Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-26 Thread Congxian Qiu
Thanks ZhuZhu for managing this release and everyone else who contributed to this release! Best, Congxian Xingbo Huang 于2020年8月26日周三 下午1:53写道: > Thanks Zhu for the great work and everyone who contributed to this release! > > Best, > Xingbo > > Guowei Ma 于2020年8月26日周三 下午12:43写道: > >> Hi, >> >>

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Congxian Qiu
Congratulations Dian Best, Congxian Xintong Song 于2020年8月27日周四 下午7:50写道: > Congratulations Dian~! > > Thank you~ > > Xintong Song > > > > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu wrote: > > > Congratulations Dian! > > > > Best, > > Jark > > > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu wrote: >

Re: OOM error for heap state backend.

2020-08-27 Thread Congxian Qiu
Hi The stack said that the job failed when restoring from checkpoint/savepoint. If encounter this when in failover, maybe you can try to find out the root cause which caused the job failover. For the stack, it because when restoring `HeapPriorityQueue`, there would ensure there are enough siz

Re: restoring from externalized incremental rocksdb checkpoint?

2020-09-14 Thread Congxian Qiu
Hi Jeff You can restore from retained checkpoint such as[1] `bin/flink run -s :checkpointMetaDataPath [:runArgs]` , you may find the metadata in the `chk-xxx` directory[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#resuming-from-a-retained-check

Re: flink checkpoint timeout

2020-09-14 Thread Congxian Qiu
Hi You can try to find out is there is some hot method, or the snapshot stack is waiting for some lock. and maybe Best, Congxian Deshpande, Omkar 于2020年9月15日周二 下午12:30写道: > Few of the subtasks fail. I cannot upgrade to 1.11 yet. But I can still > get the thread dump. What should I be lookin

Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-26 Thread Congxian Qiu
Hi Eleanore What the `CheckpointRetentionPolicy`[1] did you set for your job? if `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the checkpoint will be kept when canceling a job. PS the image did not show [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/stat

Re: restoring from externalized incremental rocksdb checkpoint?

2020-10-12 Thread Congxian Qiu
xpect a metadata file when using > incremental checkpoints? > > On Mon, Sep 14, 2020 at 10:46 PM Congxian Qiu > wrote: > >> Hi Jeff >>You can restore from retained checkpoint such as[1] `bin/flink run -s >> :checkpointMetaDataPath [:runArgs]` , you may find the m

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Congxian Qiu
Hi As others said, state is different as checkpoint. a checkpoint is just a **snapshot** of the state, and you can restore from the previous checkpoint if the job crashed. state is for stateful computation, and checkpoint is for fault-tolerant[1] The state keeps the information you'l

Re: Feature request: Removing state from operators

2020-10-29 Thread Congxian Qiu
Hi Peter Can applyToAllKeys[1] in KeyedStateBackend help you here? but currently, this is not exposed to users now. [1] https://github.com/apache/flink/blob/fada6fb6ac9fd7f6510f1f2d77b6baa06563e222/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L65 Best, Co

Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-29 Thread Congxian Qiu
Hi Partha The exception here said that there is some operator in the checkpoint/savepoint, but not in the new program. As you said that, both times use the same user program binary, this seems a little strange to me. did you ever set the uid for all the operators? Best, Congxian Parth

Re: checkpoint interval and hdfs file capacity

2020-11-09 Thread Congxian Qiu
Hi No matter what interval you set, Flink will take care of the checkpoints(remove the useless checkpoint when it can), but when you set a very small checkpoint interval, there may be much high pressure for the storage system(here is RPC pressure of HDFS NN). Best, Congxian lec ssmi 于2020年1

Re: checkpoint interval and hdfs file capacity

2020-11-11 Thread Congxian Qiu
elete old files? > > Congxian Qiu 于2020年11月10日周二 下午2:16写道: > >> Hi >> No matter what interval you set, Flink will take care of the >> checkpoints(remove the useless checkpoint when it can), but when you set a >> very small checkpoint interval, there may be much

Re: Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Congxian Qiu
Hi Bajaj Savepoint does contain the metadata and data in Flink 1.10, it does not need to reference any checkpoint data. Best, Congxian Bajaj, Abhinav 于2020年11月17日周二 上午8:58写道: > Hi, > > > > I am trying to understand the Flink 1.10 savepoints related documentation >

Re: checkpoint _metadata file has >20x different in size among different check-points

2020-03-05 Thread Congxian Qiu
Hi Maybe there contains some ByteStreamStateHandle in the checkpoint, if you want to verify this, maybe you can configure `state.backend.fs.memory-threshold` to verify it. Please be careful to set this config, because it may produce many files with small size. Best, Congxian Arvid Heise 于2020年

Re: How to print the aggregated state everytime it is updated?

2020-03-06 Thread Congxian Qiu
Hi >From the description, you use window operator, and set to event time. then you should call `DataStream.assignTimestampsAndWatermarks` to set the timestamp and watermark. Window is triggered when the watermark exceed the window end time Best, Congxian kant kodali 于2020年3月4日周三 上午5:11写道: > H

Re: Very large _metadata file

2020-03-09 Thread Congxian Qiu
Hi As Gordon said, the metadata will contain the ByteStreamStateHandle, when writing out the ByteStreamStateHandle, will write out the handle name -- which is a path(as you saw). The ByteStreamStateHandle will be created when state size is small than `state.backend.fs.memory-threshold`(default is

Re: End to End Latency Tracking in flink

2020-03-27 Thread Congxian Qiu
Hi As far as I know, the latency-tracking feature is for debugging usages, you can use it to debug, and disable it when running the job on production. >From my side, use $current_processing - $event_time is something ok, but keep the things in mind: the event time may not be the time ingested in Fl

Re: How to debug checkpoints failing to complete

2020-03-27 Thread Congxian Qiu
Hi >From my experience, you can first check the jobmanager.log, find out whether the checkpoint expired or was declined by some task, if expired, you can follow the adivce of seeksst given above(maybe enable debug log can help you here), if was declined, then you can go to the taskmanager.log to f

Re: Making job fail on Checkpoint Expired?

2020-04-03 Thread Congxian Qiu
Currently, only checkpoint declined will be counted into `continuousFailureCounter`. Could you please share why do you want the job to fail when checkpoint expired? Best, Congxian Timo Walther 于2020年4月2日周四 下午11:23写道: > Hi Robin, > > this is a very good observation and maybe even unintended beh

Re: Questions regarding Key Managed state

2020-04-03 Thread Congxian Qiu
Hi Many keys can be in a single state(each state can have multiple key-group, and keys will be assigned to the right key-group) If you write a custom process function that uses a state you created, then there is only one user state in that instance(do not count the underlying state of Flink if th

Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Congxian Qiu
Thanks a lot for the release and your great job, Gordon! Also thanks to everyone who made this release possible! Best, Congxian Oytun Tez 于2020年4月8日周三 上午2:55写道: > I should also add, I couldn't agree more with this sentence in the release > article: "state access/updates and messaging need to b

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Congxian Qiu
Hi >From the stack, seems the problem is that "org.apache.flink.fs.shaded. hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-", and I googled the exception, found there is some relative page[1], could you please make sure there

Re: State size Vs keys number perfromance

2020-04-07 Thread Congxian Qiu
Hi I'll give some information from my side: 1. The performance for RocksDB is mainly related to the (de)serialization and disk read/write. 2. MapState just need to (de)serialize the single mapkey/mapvalue when read/write state, ValueState need to (de)serialize the whole state when read/write the st

Re: Making job fail on Checkpoint Expired?

2020-04-07 Thread Congxian Qiu
> > I'd love to get your opinion on this! > > Thanks, > Robin > > Le ven. 3 avr. 2020 à 11:17, Congxian Qiu a > écrit : > >> Currently, only checkpoint declined will be counted into >> `continuousFailureCounter`. >> Could you please share why do you

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-08 Thread Congxian Qiu
ose references. However, as I > mentioned in original post, there is enough capacity in all disk. Also, > when I switch to presto file system, the problem goes away. Wondering > whether others encounter similar issue. > > Best > Lu > > On Tue, Apr 7, 2020 at 7:03 PM Congxian Q

Re: State size Vs keys number perfromance

2020-04-08 Thread Congxian Qiu
nd up with about the same state size on each subtask, then there is no difference at this point Best, Congxian KristoffSC 于2020年4月8日周三 下午3:36写道: > Thanks Congxian Qiu, > I'm aware about your second point. In Value state I will keep String or > very > simple POJO, without

Re: FlinkRuntimeException: Unexpected list element deserialization failure

2020-04-10 Thread Congxian Qiu
Hi As Yun said, could you please share the whole stack trace(normally, you can copy more lines below the given stack trace), can not tell why this happened from the given stack trace. Best, Congxian Yun Tang 于2020年4月10日周五 上午1:48写道: > Hi > > I think you have missed the "caused by" exception [1

Re: Making job fail on Checkpoint Expired?

2020-04-10 Thread Congxian Qiu
dering if there is a way for long checkpoints to create > backpressure on the rest of the stream? This would be a nice feature to > have, since it would avoid the state growing too much when checkpointing > takes time because of temporary network issues for example. > > Thanks for

Re: Storing Operator state in RocksDb during runtime - plans

2020-04-10 Thread Congxian Qiu
Hi KristoffSC I'm not aware of any concrete plans for such a feature. I also CCed Yu, he may give more information about this. Best, Congxian Fabian Hueske 于2020年4月7日周二 上午4:27写道: > Hi Kristoff, > > I'm not aware of any concrete plans for such a feature. > > Best, > Fabian > > Am So., 5. Apr.

Re: checkpointing opening too many file

2020-04-24 Thread Congxian Qiu
Hi If there are indeed so many files need to upload to hdfs, then currently we do not have any solutions to limit the open files, there exist an issue[1] wants to fix this problem, and a pr for it, maybe you can try the attached pr to try it can solve your problem. [1] https://issues.apache.org/ji

Re: Debug Slowness in Async Checkpointing

2020-04-24 Thread Congxian Qiu
Hi If the bottleneck is the upload part, did you even have tried upload files using multithread[1] [1] https://issues.apache.org/jira/browse/FLINK-11008 Best, Congxian Lu Niu 于2020年4月24日周五 下午12:38写道: > Hi, Robert > > Thanks for relying. Yeah. After I added monitoring on the above path, it > sh

Re: Flink 1.9.2 why always checkpoint expired

2020-04-27 Thread Congxian Qiu
Hi The image is not very clear. For RocksDBStateBackend, do you enable incremental checkpoint? Currently, checkpoint on TM side contains some steps: 1 barrier align 2 sync snapshot 3 async snapshot For expired checkpoint, could you please check the tasks in the first operator of the DAG to find o

Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Congxian Qiu
Hi 1. From my experience, Flink can support such big state, you can set appropriate parallelism for the stateful operator. for RocksDB you may need to care about the disk performance. 2. Inside Flink, the state is separated by key-group, each task/parallelism contains multiple key-groups. Flink d

Re: Savepoint memory overhead

2020-05-03 Thread Congxian Qiu
Hi >From the given fig, seems that the end-to-end duration of the two failed checkpoint is small(it is not timeout due to some reason), could you please check why did they fail? Maybe you can find something in jm log such as "Decline checkpoint {} by task {} of job {} at {}." then you can go to th

Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Congxian Qiu
to redistribute > terabytes of state data on node addition / node failure. > > Thanks! > > On Sun, May 3, 2020 at 6:56 PM Congxian Qiu > wrote: > >> Hi >> >> 1. From my experience, Flink can support such big state, you can set >> appropriate parallelism fo

Re: Flink: For terabytes of keyed state.

2020-05-06 Thread Congxian Qiu
, May 6, 2020 at 8:34 AM Congxian Qiu > wrote: > >> Hi >> >> From my experience, you should care the state size for a single task(not >> the whole job state size), the download speed for single thread is almost >> 100 MB/s (this may vary in different env), and I do

Re: checkpointing opening too many file

2020-05-06 Thread Congxian Qiu
tate size. > > thanks > On 4/25/2020 13:48,Congxian Qiu > wrote: > > Hi > If there are indeed so many files need to upload to hdfs, then currently > we do not have any solutions to limit the open files, there exist an > issue[1] wants to fix this problem, and a pr for it, maybe

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-06 Thread Congxian Qiu
Hi For the rate limit, could you please try entropy injection[1]. For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid. [1] https://ci.apache.org

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Congxian Qiu
#x27;t control anything beyond the checkpoint directory, and since > shared is in that directory I can't put entropy inside the shared directory > itself (which is what I would need). > > Thanks, > Trystan > > On Wed, May 6, 2020 at 7:31 PM Congxian Qiu > wrote: >

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-08 Thread Congxian Qiu
find some solution for ways to clean up these directories. > > I appreciate your patience and help, thank you so much! > > Trystan > > On Thu, May 7, 2020 at 7:15 PM Congxian Qiu > wrote: > >> Hi >> >> Yes, there should only files used in checkpoint 8 an

Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-15 Thread Congxian Qiu
Thanks a lot for the release and your great job, Yu! Also thanks to everyone who made this release possible! Best, Congxian Yu Li 于2020年5月14日周四 上午1:59写道: > The Apache Flink community is very happy to announce the release of Apache > Flink 1.10.1, which is the first bugfix release for the Apach

Re: Incremental state with purging

2020-05-15 Thread Congxian Qiu
Hi >From your description, you want to do two things: 1 update state and remote the state older than x 2 output the state every y second >From my side, the first can be done by using TTL state as Yun said, the second can be done by using KeyedProcessFunction[1] If you want to have complex logic

Re: the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-15 Thread Congxian Qiu
Hi, Could you please share the stack or the log message? If I understand correctly, savepoint V3 is not contained in 1.10, Best, Congxian Roc Marshal 于2020年5月15日周五 下午4:33写道: > Hi, all. > > When using savepoint to upgrade a Flink job from blink-1.5 to flink-1.10, > the system prompts that blink

Re: Flink Key based watermarks with event time

2020-05-15 Thread Congxian Qiu
Hi Maybe you can try KeyedProcessFunction[1] for this, but you need to handle the allow-latency logic[2] in your own business logic(event-time records maybe out-of-order) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#the-keyedprocessfu

Re: Protection against huge values in RocksDB List State

2020-05-16 Thread Congxian Qiu
Hi As you described, I'm not sure whether MapState can help you in such case. MapState will serializer each separately, so it would not encounter such the problem as ListState. When using MapState, you may need to handle how to set the mapKey, if the whole state will be cleared after processed,

Re: the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-17 Thread Congxian Qiu
we open source blink. As you can see, we > use "org.apache.flink.runtime.state.KeyGroupsStateSnapshot" instead of " > org.apache.flink.runtime.state.KeyGroupsStateHandle", and thus the > savepoint generated by Blink cannot be easily consumed by Flink. > > Best >

Re: Process available data and stop with savepoint

2020-05-17 Thread Congxian Qiu
Hi Sergii If I understand correctly, you want to process all the files in some directory, and do not want to process them multiple times. I'm not sure if using `FileProcessingMode#PROCESS_CONTINUOUSLY` instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, and keep the job running

Re: Rocksdb implementation

2020-05-19 Thread Congxian Qiu
Hi Flink will store state in StateBackend, there exist two StateBackends: HeapStateBackend - which will store state in heap, and RocksDBStateBackend -- which will store state in RocksDB. You can set RocksDB with the following ways:[1] 1. add `env.setStateBackend(...);` in your code 2. add configu

Re: Protection against huge values in RocksDB List State

2020-05-19 Thread Congxian Qiu
hould be too big for the > RocksDB JNI bridge. This seems to make our job behave better! Thanks for > your help guys, this was really helpful :) > > Robin > > Le sam. 16 mai 2020 à 09:05, Congxian Qiu a > écrit : > >> Hi >> >> As you described, I&#x

Re: Modified & rebuilt Flink source code but changes do not work

2020-05-26 Thread Congxian Qiu
Hi If you commit the change in you local git repo, could you please check whether the commitid in job log(such as `Rev:28bdd33`, the 28bdd33 is the commit id) is the same as the local commit id? Best, Congxian Qi K. 于2020年5月26日周二 下午4:47写道: > Hi folks, > > Within our team, we made some simple

Re: Inconsistent checkpoint durations vs state size

2020-05-29 Thread Congxian Qiu
Hi >From the given picture, 1. there were some checkpoint failed(but not because of timeout), could you please check why these checkpoint would fail? 2. The checkpoint data size is the delta size for current checkpoint[1], assume you using incremental checkpoint 3. In fig1 the checkpoint size is ~3

Re: Auto adjusting watermarks?

2020-05-29 Thread Congxian Qiu
Hi Could it be store a histogram data in custom `BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` according to the histogram data ok for you case? (be careful, such histogram data would not snapshot out when checkpointing) Best, Congxian Theo Diefenthal 于2020年5月30日周六

Re: Custom trigger to trigger for late events

2020-05-29 Thread Congxian Qiu
Hi Poornapragna I'll try to answer your questions 1. you don't need to delete the timer manually(it will be deleted after fired), but you can delete the timers manually if you want. 2. AFAIK, triggers would not be snapshot out, but the timers will be snapshot out 3. delete timer that was not regi

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Congxian Qiu
nally >> configured to that and the checkpoints were taking too long to complete >> before the next checkpoint interval began. >> >> >> >> Our tm’s are normally 3 slots (3_slots.png), we wanted to try running >> with 1 slot (1_slot.png) and noticed the checkpoi

Re: [ANNOUNCE] Apache Flink Stateful Functions 2.1.0 released

2020-06-09 Thread Congxian Qiu
@Gordon thanks a lot for the release and for being the release manager. Also thanks to everyone who made this release possible! Best, Congxian Oytun Tez 于2020年6月9日周二 下午7:08写道: > Thank you, Gordon and everyone. > > On Tue, Jun 9, 2020 at 5:56 AM Tzu-Li (Gordon) Tai > wrote: > >> The Apache Fli

Re: MapState in flink

2020-06-14 Thread Congxian Qiu
Hi Could you please share why you need `MapState` instead of `MapState>` Best, Congxian Oytun Tez 于2020年6月14日周日 上午3:39写道: > Correct me @everyone if I'm wrong, but you cannot keep State inside State > in that way. So change your signature to: MapState> > > The underlying mechanism wouldn't mak

Re: Incremental state

2020-06-14 Thread Congxian Qiu
Hi Can process function[1] can meet your needs here?, you can do the TTL logic using timers in process functions. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html Best, Congxian Timo Walther 于2020年6月10日周三 下午9:36写道: > Hi Annemarie, > >

Re: Is State TTL possible with event-time characteristics ?

2020-06-17 Thread Congxian Qiu
Hi Currently, Flink does not support event-time TTL state, there is an issue[1] tracking this. [1] https://issues.apache.org/jira/browse/FLINK-12005 Best, Congxian Arti Pande 于2020年6月17日周三 下午7:37写道: > With Flink 1.9 is state TTL supported for event-time characteristics? This > part >

Re: Improved performance when using incremental checkpoints

2020-06-17 Thread Congxian Qiu
Hi Nick The result is a bit wired. Did you compare the disk util/performance before and after enabling checkpoint? Best, Congxian Yun Tang 于2020年6月17日周三 下午8:56写道: > Hi Nick > > I think this thread use the same program as thread "MapState bad > performance" talked. > Please provide a simple pr

Re: Shared state between two process functions

2020-06-17 Thread Congxian Qiu
Hi Maybe you can take a look at broadcast state[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html Best, Congxian Robert Metzger 于2020年6月16日周二 上午2:18写道: > Thanks for sharing some details on the use case: Are you able to move the > common

Re: Trouble with large state

2020-06-20 Thread Congxian Qiu
Hi Sorry to jump in late. After read the previous email. I have such assumptions, and please correct me if I'm wrong: - RocksDBStateBackend with incremental checkpoint - at least once mode - the parallelism for the stateful operator is 8 - checkpoint may take too long to complete - has fix rate in

Re: MapState bad performance

2020-06-20 Thread Congxian Qiu
Hi Nick Sorry for the late jump in. Just wondering why you call putAll of RocksDBMapState and has RocksDBMapState#clear() followed. seems the state will always be empty after the process. Best, Congxian Yun Tang 于2020年6月16日周二 下午7:42写道: > Hi Nick > > From my experience, it's not easy to tune

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-05 Thread Congxian Qiu
Hi First, Could you please try this problem still there if use flink 1.10 or 1.11? It seems strange, from the error message, here is an error when trying to convert a non-Window state(VoidNameSpace) to a Window State (serializer is the serializer of Window state, but the state is non-Window state

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-05 Thread Congxian Qiu
Hi SmileSmile As the OOM problem, maybe you can try to get a memory dump before OOM, after you get the memory dump, you can know who consumes more memory as expected. Best, Congxian Yun Tang 于2020年7月3日周五 下午3:04写道: > Hi > > If you do not enable checkpoint and have you ever restored checkpoint

Re: Does savepoint reset the base for incremental checkpoint

2020-07-06 Thread Congxian Qiu
Hi checkpoint base is only used in the incremental checkpoint, the answer for the first question is checkpoint x. After restoring from a savepoint, there is no base for the first checkpoint. you can ref to the code[1][2] for more information. [1] https://github.com/apache/flink/blob/c14f9d2f9f6

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread Congxian Qiu
Thanks Zhijiang and Piotr for the great work as release manager, and thanks everyone who makes the release possible! Best, Congxian Benchao Li 于2020年7月8日周三 下午12:39写道: > Congratulations! Thanks Zhijiang & Piotr for the great work as release > managers. > > Rui Li 于2020年7月8日周三 上午11:38写道: > >>

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-11 Thread Congxian Qiu
Hi David As you say the savepoint use local disk, I assume that you use RocksDBStateBackend. What's the flink version are you using now? What do you mean "The task manager did not clean up the state"?, does that mean the local disk space did not clean up, do the task encounter failover in this p

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-11 Thread Congxian Qiu
; Sorry > > I can't reproduce it with reduce/aggregate/fold/apply and due to some > limitations in my working environment, I can't use flink 1.10 or 1.11. > > Congxian Qiu 于2020年7月5日周日 下午6:21写道: > >> Hi >> >> First, Could you please try this problem still

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-11 Thread Congxian Qiu
Hi Ori AFAIK, current the 2GB limit is still there. as a workaround, maybe you can reduce the state size. If this can not be done using the window operator, can the keyedprocessfunction[1] be ok for you? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process

Re: Timeout when using RockDB to handle large state in a stream app

2020-07-11 Thread Congxian Qiu
Hi Felipe I'm just wandering does increase the heartbeat.timeout with RocksDBStateBackend works for you. If not, what does the GC log say? thanks. Best, Congxian Felipe Gutierrez 于2020年7月7日周二 下午10:02写道: > I figured out that for my stream job the best was just to use the > default MemoryStateB

Re: How to debug window states

2020-07-19 Thread Congxian Qiu
Hi There is an issue[1] wants to provide an easy way to read/bootstrap window state using the state processor API, but the PR was closed currently. [1] https://issues.apache.org/jira/browse/FLINK-13095 Best, Congxian Paul Lam 于2020年7月15日周三 下午4:21写道: > It turns out to be a bug of StateTTL [1]

Re: How to get flink JobId in runtime

2020-07-20 Thread Congxian Qiu
Hi Sili I'm not sure if there are other ways to get this value properly. Maybe you can try `RuntimeContext.getMetricGroup().getAllVariables().get("")`. Best, Congxian Si-li Liu 于2020年7月20日周一 下午7:38写道: > Hi > > I want to retrieve flink JobId in runtime, for example, during > RichFunction's

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
>>> use did it. >>> >>> For my scenario, I'm using the Flink REST API to start/deploy jobs. A >>> retained checkpoint isn't supported in REST API and even if it was, I think >>> it doesn't fit my scenario (stop a job, and start the new one fro

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
, in this case, was writing one line into the logger (I was writing 8 > GB in total), and that makes more sense. So nothing wrong with the > Flink/Savepoint behaviour. > > Thanks, > David > > On Tue, Jul 21, 2020 at 12:37 PM Congxian Qiu > wrote: > >> Hi David

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Congxian Qiu
Thanks Dian for the great work and thanks to everyone who makes this release possible! Best, Congxian Rui Li 于2020年7月23日周四 上午10:48写道: > Thanks Dian for the great work! > > On Thu, Jul 23, 2020 at 10:22 AM Jingsong Li > wrote: > > > Thanks for being the release manager for the 1.11.1 release,

Re: Flink failed to resume from checkpoint stored on S3

2020-07-22 Thread Congxian Qiu
Hi Xiaolong From the log, seems there is no `_metadata` file in the checkpoint directory s3:///flink/checkpoint_dir/65786c3307a10e79a52b4de478cfe996/chk-7853. Do you configurate the retain checkpoint configuration[1] ever? If we do not configuration it, the checkpoint will be deleted if job

Re: Is it possible to do state migration with checkpoints?

2020-07-23 Thread Congxian Qiu
Hi Sivaprasanna I think state schema evolution can work for incremental checkpoint. And I tried with a simple Pojo schema, It also works. maybe you need to check the schema, from the exception stack, the schema before and after are incompatible. Best, Congxian Sivaprasanna 于2020年7月24日周五 上午12

Re: Flink 1.11 job stop with save point timeout error

2020-07-25 Thread Congxian Qiu
Hi Ivan From the JM log, the savepoint complete with 1 second, and the timeout exception said that the stop-with-savepoint can not be completed in 60s(this was calculated by 20 -- RestOptions#RETRAY_MAX_ATTEMPTS * 3s -- RestOptions#RETRY_DELAY. you can check the logic here[1]). I'm not sure what

Re: unsubscribe

2020-07-29 Thread Congxian Qiu
Hi Please send an email to user-unsubscr...@flink.apache.org for unsubscribing. you can get more info here[1] [1] https://flink.apache.org/community.html#mailing-lists Best, Congxian Maatary Okouya 于2020年7月30日周四 上午12:09写道: > >

Re: Unable to recover from checkpoint

2020-07-30 Thread Congxian Qiu
Hi Sivaprasanna For RocksDBStateBackend incremental checkpoint, the latest checkpoint may contain the files of the previous checkpoint(the files in the shared directory), so delete the files belong to the previous checkpoint may lead to FileNotFoundException. Currently, we can only parse the

Re: allowNonRestoredState: metadata file in checkpoint dir missing

2020-08-01 Thread Congxian Qiu
Hi Omkar `--allowNonRestoredState` would not affect the behavior of checkpoint, it only affects the restore logic. As the problem of not generate _metadata file, could you please check 1) if the job enabled checkpoint; 2) if there is any checkpoint complete successfully. Best, Congxian Des

Re: flink checkpoints adjustment strategy

2021-01-29 Thread Congxian Qiu
Hi Marco You need to figure out why the checkpoint timed out(you can see the consumed time of each period for one checkpoint in UI), if it indeed needs such long time to complete the checkpoint, then you need to configure a longer timeout. If there are some checkpoint errors, we need firs

Re: Read the metadata files (got from savepoints)

2021-03-21 Thread Congxian Qiu
Hi Maybe you can reach to this test[1] for reference [1] https://github.com/apache/flink/blob/a33e6bd390a9935c3e25b6913bed0ff6b4a78818/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java#L55 Best, Congxian Abdullah bin Omar 于2021年3月22日周一 上午11:25

Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Congxian Qiu
Hi From the description, the time used to complete the checkpoint in 1.12 is longer. could you share more detail about the time consumption when running job on 1.9 and 1.12? Best, Congxian Haihang Jing 于2021年3月23日周二 下午7:22写道: > 【Appearance】For jobs with the same configuration (checkpoint in

Re: Why is the size of each checkpoint increasing?

2019-07-31 Thread Congxian Qiu
Hi Andrew >From Flink doc[1], there is "Flink guarantees removal only for time-based windows and not for other types, *e.g.* global windows (see Window Assigners ). ", Seems the state of the

Re: Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-08 Thread Congxian Qiu
Congratulations Hequn! Best, Congxian Yu Li 于2019年8月8日周四 下午2:02写道: > Congratulations Hequn! Well deserved! > > Best Regards, > Yu > > > On Thu, 8 Aug 2019 at 03:53, Haibo Sun wrote: > >> Congratulations! >> >> Best, >> Haibo >> >> At 2019-08-08 02:08:21, "Yun Tang" wrote: >> >Congratulations

Re: Capping RocksDb memory usage

2019-08-08 Thread Congxian Qiu
Hi Maybe FLIP-49[1] "Unified Memory Configuration for TaskExecutors" can give some information here [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html Best, Congxian Cam Mach 于2019年8月9日周五 上午4:59写道: > Hi

Re: Changing the way keys are defined breaks savepoints

2019-08-14 Thread Congxian Qiu
Hi, >From what you described, does the key have the same type before and after the change? and for using Lambda expression, maybe the doc[1] can be helpful(especially the limitation) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/java_lambdas.html#examples-and-limitations Bes

Re: Update Checkpoint and/or Savepoint Timeout of Running Job without restart?

2019-08-18 Thread Congxian Qiu
Hi Currently, we can't change a running job's checkpoint timeout, but there is an issue[1] which wants to set a separate timeout for savepoint. [1] https://issues.apache.org/jira/browse/FLINK-9465 Best, Congxian Aaron Levin 于2019年8月17日周六 上午12:37写道: > Hello, > > Question: Is it possible to upd

Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Congxian Qiu
Congratulations, and thanks for everyone who make this release possible. Best, Congxian Kurt Young 于2019年8月23日周五 上午8:13写道: > Great to hear! Thanks Gordon for driving the release, and it's been a > great pleasure to work with you as release managers for the last couple of > weeks. And thanks ev

Re: Externalized checkpoints

2019-08-22 Thread Congxian Qiu
Hi, Vishwas As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to specify the maximum number of completed checkpoints to retain. maybe you can also ref the external checkpoint cleanup type[2] config for how to clean up the retained checkpoint[2] [1] https://ci.apache.org/projects/fl

Re: Per Key Grained Watermark Support

2019-09-23 Thread Congxian Qiu
Hi There was a discussion about this issue[1], as the previous discussion said at the moment this is not supported out of the box by Flink, I think you can try keyed process function as Lasse said. [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Watermark-for-each-key-td274

Re: Debugging slow/failing checkpoints

2019-09-26 Thread Congxian Qiu
Hi Steve 1. Do you use exactly once or at least once? 2. Do you use incremental or not 3. Do you have any timer, and where does the timer stored(Heap or RocksDB), you can ref the config here[1], you can try store the timer in RocksDB. 4. Does the align time too long 5. You can check if it is sync

Re: Broadcast state

2019-09-30 Thread Congxian Qiu
Hi, Could you use some cache system such as HBase or Reids to storage this data, and query from the cache if needed? Best, Congxian Navneeth Krishnan 于2019年10月1日周二 上午10:15写道: > Thanks Oytun. The problem with doing that is the same data will be have to > be stored multiple times wasting memory

Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-09-30 Thread Congxian Qiu
Hi Oliwer, >From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared. Best, Congxian Oliwer Kostera 于2019年9月27日周五 下午4:14写道: > Hi

Re: Group by multiple fields

2019-10-07 Thread Congxian Qiu
Hi Miguel Maybe the doc[1] about how to specifying the keys can help. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html#specifying-keys Best, Congxian Miguel Farrajota 于2019年10月8日周二 上午12:09写道: > Hi, > > I'm looking to do some result aggregations on a event

Re: Building Flink 1.6.4 fails with "object scala in compiler mirror not found"

2019-10-07 Thread Congxian Qiu
Hi if you just want to skip the test, do you try to add `-DskipTests` when executing maven command. Best, Congxian Aikio, Torste 于2019年10月7日周一 下午11:36写道: > Hi, > > I'm trying to build Flink 1.6.4 from source and some of the tests for > flink-scala module are failing for me. Are there some add

Re: Problem with savepoint deserialization

2019-10-08 Thread Congxian Qiu
Hi Steven >From the exception, seems the serializer used before and after the change is incompatible, I'm not very familiar with Scala case class, maybe you can debug it locally, which serializer used before and after the change for the case class. Best, Congxian Steven Nelson 于2019年10月9日周三 上

Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Congxian Qiu
Hi Vishwas Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch. ``` checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs] savepoint --> bin/flink run -s :savepointPath [:runArgs] [1] https://ci.a

Re: Broadcast state

2019-10-09 Thread Congxian Qiu
kafka streams where I have read access to the state across the >> pipeline. That will indeed solve a lot of problems. Is there some way I can >> do the same with flink? >> >> Thanks! >> >> On Mon, Sep 30, 2019 at 10:36 PM Congxian Qiu >> wrote: >> &g

  1   2   3   >