Re: [ANNOUNCE] Apache Flink 2.1.0 released

2025-07-31 Thread Zakelly Lan
Congratulations! Thanks Ron and all the contributors! Best, Zakelly On Thu, Jul 31, 2025 at 8:03 PM Lincoln Lee wrote: > Congratulations! Thanks for driving this! > > Best, > Lincoln Lee > > > Leonard Xu 于2025年7月31日周四 15:36写道: > > > Nice! Thanks Ron and all involved for the great work. > > >

Re: Unit testing KeyedProcessFunction that uses async state

2025-06-11 Thread Zakelly Lan
Hi Nate, Glad to see you trying out the new API. Well, you're right that the `ProcessFunctionTestHarnesses` doesn't support the new state API yet. There are operators tailored for async state access, as well the operator test harness. But unfortunately we can't tell in advance if a `KeyedProcessF

Re: Is there anyway to control the size of my ListState in my KeyedProcessFunction

2025-05-29 Thread Zakelly Lan
Hi Sachin, I assume you are using the rocksdb state backend. The TTL for ListState is applied for each list entry, if you are using `ListState.add`. However if you do ListState.update, the entire list is rewrite so the ttl is updated. Could you share your use case and the ttl config? Another sugge

Re: Flink 2.0, large state and KeyedCoProcessFunction based stream joins - Async state store questions

2025-04-28 Thread Zakelly Lan
Hi Francis, Actually it is implemented[1,2] but has not been released yet. This allows you to write a customized `KeyedCoProcessFunction` using async state. This will be released in the next version (2.1). It would be great if you could try this out on the master branch of Flink to see if there is

Re: Flink : 1.20 : KeyedCoProcessFunction function waiting for 12 minutes in Initialisation State

2025-04-06 Thread Zakelly Lan
Hi Santosh, May I ask which state backend you are using? And how huge is the checkpoint? You may check the disk usage of the configured checkpoint path. Best, Zakelly On Sat, Apr 5, 2025 at 11:58 PM santosh techie wrote: > Hello, > > Flink Version : 1.20. > > I am just beginning to develop a

Re: ForSt State backend seem to try to download all state locally

2025-04-04 Thread Zakelly Lan
e/#for-sql-jobs Best, Zakelly On Fri, Apr 4, 2025 at 6:41 PM Gyula Fóra wrote: > This is the flamegrapgh during the no-rescale restart. I couldnt attach it > for the mailing list > > On Fri, Apr 4, 2025 at 12:24 PM Zakelly Lan wrote: > >> Hi Gyula, >> >> I ass

Re: ForSt State backend seem to try to download all state locally

2025-04-04 Thread Zakelly Lan
this. Best, Zakelly On Fri, Apr 4, 2025 at 7:43 PM Zakelly Lan wrote: > Hi Gyula, > > Just because the Sync mode is basically inherited from rocksdb, and the > async mode is a completely new code path. You are right, the state backend > should have the remote storage even for sync

Re: ForSt State backend seem to try to download all state locally

2025-04-04 Thread Zakelly Lan
Hi Gyula, I assumed it will only download at most 10GB and just start reading from > remote and the job should start up "immediately". It won't start up immediately, instead it clips the state before running. This clipping process is primarily performed on the remote side. This may involve writi

Re: [ANNOUNCE] Apache Flink 2.0.0 released

2025-03-24 Thread Zakelly Lan
Congratulations! Thanks Xintong, Jark, Jiangjie and Martijn for driving the 2.0. Thanks everyone for the great work! Best, Zakelly On Mon, Mar 24, 2025 at 4:24 PM Xintong Song wrote: > The Apache Flink community is very happy to announce the release of Apache > Flink 2.0.0, which is the first

Re: Data skew after keyBy (even with a good number of key groups)

2025-03-12 Thread Zakelly Lan
Hi Vadim, Could you please check if there are records with identical keys or hash code of keys. The keyby redistribution relies on an even distribution of hash codes. If there are identical hash codes, there probably be a data skew. Best, Zakelly On Tue, Mar 11, 2025 at 8:11 PM Vararu, Vadim vi

Re: Q: How to best configure checkpoint to ensure they do not fill-up the storage?

2025-01-02 Thread Zakelly Lan
y are not cleaned up. > > I do not remember anything about a state TTL, so it's probably not set. Is > that code ? a property? If so which one. > > Thanks > > JM > > On Thu, 2 Jan 2025, 06:38 Zakelly Lan, wrote: > >> FW to user ML. >> >>

Re: Q: How to best configure checkpoint to ensure they do not fill-up the storage?

2025-01-01 Thread Zakelly Lan
Zakelly Lan wrote: > Hi Jean-Marc, > > Could you elaborate more about how you noticed an increasing number of > checkpoints are left behind? Is the number of subdirectories under > s3://flink-application/checkpoints increasing? And have you set the state > TTL? > > > Best

Re: cannot locate classes in flink-release-2.0-preview1 source

2024-12-11 Thread Zakelly Lan
Hi Enric, This may be because your IDE has not indexed the dependency. Please try reloading the maven project, by right clicking the project root -> Maven -> Reload project. Best, Zakelly On Wed, Dec 11, 2024 at 5:30 PM Enric Ott <243816...@qq.com> wrote: > Hello,Community: > When I import f

Re: Flink state statistics

2024-12-02 Thread Zakelly Lan
Hi Christian, I assume you're using the rocksdb state backend. You can enable some metrics from Rocksdb, please refer to the doc[1]. Please note that the `State#clear` only removes the key/value for specified key, rather than removing all keys. And the deletion will be reflected in the checkpoint

Re: How the Async Exuecution Model improved the throughtput

2024-11-26 Thread Zakelly Lan
s before 2.0-preview just handle the state I/O with the > user thread?My gosh. > > > -- 原始邮件 ------ > *发件人:* "Zakelly Lan" ; > *发送时间:* 2024年11月26日(星期二) 中午11:57 > *收件人:* "user"; > *主题:* Re: How the Async Exuecution Model improve

Re: How the Async Exuecution Model improved the throughtput

2024-11-25 Thread Zakelly Lan
Hi Enric, The asynchronous state processing prevents the task thread from blocking at the state I/O and instead allows it to perform CPU operations for another input record in the meantime. Additionally, state I/Os can run in parallel, reducing the total I/O time. Therefore, it is suitable for the

Re: [ANNOUNCE] Apache Flink 2.0 Preview released

2024-11-06 Thread Zakelly Lan
Hi Benoit, Please find the result here[1]. The Nexmark repo[2] does not officially support the flink 2.0 preview version. However, we have made a PR[3] for this and once it is merged, we will offer a guide to run Nexmark Q20 with disaggregated state management. [1] https://github.com/ververica/

Re: Setting Timers in applyToKeyedState

2024-08-28 Thread Zakelly Lan
. This needs more consideration and discussion in the community. Best, Zakelly On Thu, Aug 29, 2024 at 11:55 AM Zakelly Lan wrote: > Hi Jose, > > You are right about using {{applyToKeyedState}} to register timers under > {{KeyedBroadcastProcessFunction}}. I think the author overlook

Re: Error class RestoreMode not found after upgrading from Flink 1.19 to 1.20

2024-08-23 Thread Zakelly Lan
Hi, I suppose the solution would be to cancel the job based on Flink 1.19 and > then resubmit it with Flink 1.20. I think so. I hope this reply will be in the thread as I smartly configured to only > received digests... so sending this reply with the same titler and > hopefully it will be added

Re: Error class RestoreMode not found after upgrading from Flink 1.19 to 1.20

2024-08-23 Thread Zakelly Lan
Hi Jean-Marc, I think this is related to https://issues.apache.org/jira/browse/FLINK-34455 . Could you provide more information about your setup and how you upgrade your job? Have you re-compiled your job under 1.20? Best, Zakelly On Fri, Aug 23, 2024 at 1:36 AM Jean-Marc Paulin wrote: > Hi,

Re: Tuning rocksdb configuration

2024-07-26 Thread Zakelly Lan
gt; it didn't compact the very old file yet. 1.Do I need to change any other > rocksdb property? Or 2.does it means my source events are still coming to > same key and keeps that state?? > > Window fires for every 2s, so I don't need it the data for long time. > > Th

Re: Tuning rocksdb configuration

2024-07-25 Thread Zakelly Lan
Hi Banu, I'm trying to answer your question in brief: 1. Yes, when the memtable reaches the value you configured, a flush will be triggered. And no, sst files have different format with memtables, the size is smaller than 64mb IIUC. 2. Typically you don't need to change this value. If it is set

Re: How to enable RocksDB native metrics?

2024-04-07 Thread Zakelly Lan
Hi Lei, You can enable it by some configurations listed in: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics (RocksDB Native Metrics) Best, Zakelly On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > Hi Lei, > > You can ena

Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Zakelly Lan
Congratulations! Best, Zakelly On Thu, Mar 21, 2024 at 12:05 PM weijie guo wrote: > Congratulations! Well done. > > > Best regards, > > Weijie > > > Feng Jin 于2024年3月21日周四 11:40写道: > >> Congratulations! >> >> >> Best, >> Feng >> >> >> On Thu, Mar 21, 2024 at 11:37 AM Ron liu wrote: >> >> > C

Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Zakelly Lan
Congratulations! Thanks Lincoln, Yun, Martijn and Jing for driving this release. Thanks everyone involved. Best, Zakelly On Mon, Mar 18, 2024 at 5:05 PM weijie guo wrote: > Congratulations! > > Thanks release managers and all the contributors involved. > > Best regards, > > Weijie > > > Leona

Re: Unaligned checkpoint blocked by long Async operation

2024-03-17 Thread Zakelly Lan
> > Gyula > > On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan wrote: > >> Hi Gyula, >> >> Processing checkpoint halfway through `processElement` is problematic. >> The current element will not be included in the input in-flight data, and >> we cannot assum

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
? > > Gyula > > On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan wrote: > >> Hi Gyula, >> >> Well I tried your example in local mini-cluster, and it seems the source >> can take checkpoints but it will block in the following AsyncWaitOperator. >> IIUC, the unali

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
Hi Gyula, Well I tried your example in local mini-cluster, and it seems the source can take checkpoints but it will block in the following AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until the current `processElement` finishes its execution. In your example, the element q

Re: Question about time-based operators with RocksDB backend

2024-03-04 Thread Zakelly Lan
Hi Gabriele, Quick answer: You can use the built-in window operators which have been integrated with state backends including RocksDB. Thanks, Zakelly On Tue, Mar 5, 2024 at 10:33 AM Zhanghao Chen wrote: > Hi Gabriele, > > I'd recommend extending the existing window function whenever possible

Re: Preparing keyed state before snapshot

2024-02-20 Thread Zakelly Lan
#x27;s set specifically for > AbstractTopNFuction in StreamExecRank. > How can I do something similar without modifying the Flink runtime? > > Lorenzo > > > On Sun, 18 Feb 2024 at 03:42, Zakelly Lan wrote: > >> Hi Lorenzo, >> >> It is not recommended t

Re: Impact of RocksDB backend on the Java heap

2024-02-18 Thread Zakelly Lan
tate of each key is "large"? Again assuming the > number of distinct partition keys is large. > > Regards, > Alexis. > > On Sun, 18 Feb 2024, 05:02 Zakelly Lan, wrote: > >> Hi Alexis, >> >> Flink does need some heap memory to bridge requests t

Re: Impact of RocksDB backend on the Java heap

2024-02-17 Thread Zakelly Lan
Hi Alexis, Flink does need some heap memory to bridge requests to rocksdb and gather the results. In most cases, the memory is discarded immediately (eventually collected by GC). In case of timers, flink do cache a limited subset of key-values in heap to improve performance. In general you don't

Re: Preparing keyed state before snapshot

2024-02-17 Thread Zakelly Lan
Hi Lorenzo, It is not recommended to do this with the keyed state. However there is an example in flink code (FastTop1Function#snapshotState) [1] of setting keys when snapshotState(). Hope this helps. [1] https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/f

Re: Redis as a State Backend

2024-01-30 Thread Zakelly Lan
And I found some previous discussion, FYI: 1. https://issues.apache.org/jira/browse/FLINK-3035 2. https://www.mail-archive.com/dev@flink.apache.org/msg10666.html Hope this helps. Best, Zakelly On Tue, Jan 30, 2024 at 4:08 PM Zakelly Lan wrote: > Hi Chirag > > That's an interest

Re: Redis as a State Backend

2024-01-30 Thread Zakelly Lan
Hi Chirag That's an interesting idea. IIUC, storing key-values can be simply implemented for Redis, but supporting checkpoint and recovery is relatively challenging. Flink's checkpoint should be consistent among all stateful operators at the same time. For an *embedded* and *file-based* key value

Re: Why calling ListBucket for each file in a checkpoint

2024-01-21 Thread Zakelly Lan
Are you accessing the s3 API with presto implementation? If so, you may read the code of `com.facebook.presto.hive.s3.PrestoS3FileSystem#create` and find it check the existence of the target path first, in which the `getFileStatus` and `listPrefix` are called. There is no option for this. Best, Z

Re: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.

2024-01-18 Thread Zakelly Lan
by the company that I co-operate with. >> But, yes you are right that inside the code, I can see that the state >> initialization happens inside the AbstractProcessFunction#processElement >> method. >> >> Thank you very much, >> Kostas >> >> On Thu

Re: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.

2024-01-17 Thread Zakelly Lan
Hi, Could you please share the code of state initialization (getting state from a state descriptor)? It seems you are creating a state in #processElement? Best, Zakelly On Thu, Jan 18, 2024 at 2:25 PM Zakelly Lan wrote: > Hi, > > Could you please share the code of state initi

Re: keyby mapState use question

2023-12-10 Thread Zakelly Lan
Hi, This should not happen. I guess the `onTimer` and `processElement` you are testing are triggered under different keyby keys. Note that the keyed states are partitioned by the keyby key first, so if querying or setting the state, you are only manipulating the specific partition which does not a

Re: Unsubscribe from user list.

2023-10-18 Thread Zakelly Lan
Hi Lijuan, Please send email to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user@flink.apache.org. Best, Zakelly On Thu, Oct 19, 2023 at 6:23 AM Hou, Lijuan via user wrote: > > Hi team, > > > > Could you please remove this email from the subscription list? I have

Re: Problems with the state.backend.fs.memory-threshold parameter

2023-10-13 Thread Zakelly Lan
Hi rui, The 'state.backend.fs.memory-threshold' configures the threshold below which state is stored as part of the metadata, rather than in separate files. So as a result the JM will use its memory to merge small checkpoint files and write them into one file. Currently the FLIP-306[1][2] is propo

Re: updating keyed state in open method.

2023-09-07 Thread Zakelly Lan
Hi, You cannot access the keyed state within #open(). It can only be accessed under a keyed context ( a key is selected while processing an element, e.g. #processElement). Best, Zakelly On Thu, Sep 7, 2023 at 4:55 PM Krzysztof Chmielewski wrote: > > Hi, > I'm having a problem with my toy flink

Re: Broadcast state and job restarts

2022-10-27 Thread Zakelly Lan
Hi Alexis, Broadcast state is one type of the Operator State, which is included in savepoints and checkpoints and won't be lost. Please refer to https://stackoverflow.com/questions/62509773/flink-broadcast-state-rocksdb-state-backend/62510423#62510423 Best, Zakelly On Fri, Oct 28, 2022 at 4:41 A

Re: Limiting backpressure during checkpoints

2022-10-24 Thread Zakelly Lan
Hi Robin, You said that during the checkpoint async phase the CPU is stable at 100%, which is pretty strange to me. Normally the cpu usage of the taskmanager process could exceed 100%, depending on what all the threads are doing. I'm wondering if there is any scheduling mechanism controlling the C

Re: Flink RocksDB Performance

2021-07-16 Thread Zakelly Lan
Hi Li Jim, Filesystem performs much better than rocksdb (by multiple times), but it is only suitable for small states. Rocksdb will consume more CPU on background tasks, cache management, serialization/deserialization and compression/decompression. In most cases, performance of the Rocksdb will mee