[jira] [Created] (FLINK-37395) [BUG] When scan.startup.mode = latest-offset, an NullPointerException will be triggered
zexin gong created FLINK-37395: -- Summary: [BUG] When scan.startup.mode = latest-offset, an NullPointerException will be triggered Key: FLINK-37395 URL: https://issues.apache.org/jira/browse/FLINK-37395 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: 3.0.0 Reporter: zexin gong Attachments: screenshot-20250227-130010.png When *scan.startup.mode = latest-offset,* the initialized binlog offset is offset={ts_sec=0, kind=LATEST, row=0, event=0}, which will cause a null pointer exception -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Pluggable Batching for Async Sink in Flink
Thanks Arvid. As discussed offline, I have made the changes to doc. 1. Gotten rid of the protected methods in the constructor. 2. Added a new constructor that takes in BatchCreator/BufferWrapper from the current one. 3. The new constructor defaults to the Default BatchCreator and BufferWrapper (mimicking the current behaviour) So: - Since the old constructor remains unchanged, Flink 1.x will continue using it without any issues. - Flink 1.x was compiled with the old constructor signature, so it will only look for and call that constructor. - Even though the class contains references to BatchCreator and BufferWrapper, they won’t be loaded if the constructor that references them is never used. - If Flink 1.x explicitly tries to call the new constructor (which includes BatchCreator and BufferWrapper), it will fail since those classes do not exist in Flink 1.x. But this will only happen when a connector essentially is using the BatchCreator or the new interfaces right, so what I assume is it should be the connectors responsibility to provide differentiation if it uses BatchCreator . As for Cassandra, we can provide 2 implementations one without Batch and other with Batch, so the one with Batch can check at runtime if the newer classes can be used. So the connectors that want to use customized batching would be dependent on 2.1. If it is used with (a previous flink runtime) a check can be added in the connector to make sure that BatchCreator is available. (simple check via reflection). Please have a look and let me know your thoughts. Hey Danny. In case you missed the last response, I have added a BufferWrapper and removed the hardcoded Dequeue check. Please have a look and let me know your thoughts. On Tue, Feb 25, 2025 at 2:10 PM Arvid Heise wrote: > Hi Poorvank, > > I don't have strong feelings about the actual choice of technology. > SPI is used rather heavily in Flink Table/SQL and is also used in > similar projects more and more. But SPI also uses reflection under the > hood, so it doesn't make a huge difference except from a usability > perspective. SPI has some tool support. IDEA, for example, allows you > to jump back and forth between the service definition and the > implementations. > > In any case, protected methods that are called in the ctor cannot be > overridden because at the point where the base object is initialized > the VMT doesn't contain references to methods of the subclasses. That > is a deliberate choice of the VM engineers to avoid cases where an > overridden method accesses fields that have not been initialized yet > because the subclass ctor wasn't invoked yet. > > Also I'm not sure if class loading would succeed if you have a method > that returns an unknown class (let's say you want to run your > connector on Flink 1.X, which was built against Flink 2.1). I'm > assuming it will fail. > > So we need other choices. I'm curious if annotations could be used (in > conjunction with reflection again). I'm guessing classloading will > also fail if an unknown annotation is attached or loading of the > annotation itself will fail if the referenced class cannot be loaded. > If it works somehow, we could say: > @BatchedWith(MyBatchCreator.class) > class MyConnector extends Async... > > Unfortunately, I don't have time to try out these ideas. Maybe you have. > > And in any case, if it gets too complicated, we can always forgo > backwards compatibility and require specific builds for Flink 2.1+. > This may also be the easier option in case the connector doesn't > perform too well without the BatchCreator anyways. > > Best, > > Arvid > > On Thu, Feb 20, 2025 at 6:56 PM Poorvank Bhatia > wrote: > > > > Hey Arvid, > > > > Thank you for your feedback and for taking the time to review this > > proposal. > > To answer your concerns: > > > > *Naming Suggestion: BatchCreationResult → Batch * > > This is a valid point, and I agree that BatchCreationResult is > essentially > > representing a Batch. The current name was chosen to reflect that it > > encapsulates metadata such as batch size and count, in addition to the > > entries. I’ll update the proposal to reflect this suggestion and rename > > the class accordingly. > > > > *Compatibility Concerns*: You are on point regarding backward > compatibility > > and avoiding API breakages in connectors. > > Based on our discussion offline, I have removed the BatchCreator > interface > > from the constructor and it is not exposed in the public API of > > AsyncSinkWriter. > > Instead of requiring direct injection via the constructor, the > > implementation now uses protected factory methods (createBatchCreator()), > > allowing subclasses to override the behavior without modifying the base > > API. > > If no subclass overrides the methods, Flink will continue using the > default > > SimpleBatchCreator and DequeBufferWrapper, maintaining full backward > > compatibility. > > You suggested us
[jira] [Created] (FLINK-37394) Update CI Flink versions to 1.19 and 1.20 in Kudu connector
Ferenc Csaky created FLINK-37394: Summary: Update CI Flink versions to 1.19 and 1.20 in Kudu connector Key: FLINK-37394 URL: https://issues.apache.org/jira/browse/FLINK-37394 Project: Flink Issue Type: Technical Debt Components: Connectors / Kudu Reporter: Ferenc Csaky Assignee: Ferenc Csaky Fix For: kudu-2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[RESULT][VOTE] FLIP-491: BundledAggregateFunction for batched aggregation
The vote for FLIP-491: BundledAggregateFunction for batched aggregation [1] has concluded (discussion thread [2]). The vote will be closed [3]. There were 4 approving votes, all binding, and there were no disapprovals: -Timo Walther (binding) - Fabian Hüske (binding) - Stefan Richter (binding) - Martijn Visser (binding) Therefore, FLIP-491 was accepted. Thanks for the feedback and discussion! -Alan [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation [2] https://lists.apache.org/thread/gx9p8wh4yxfrdk73wwqzh6sdkxgqwwrf [3] https://lists.apache.org/thread/frgkp83wx8469v65c147wxhpqtoc5d89
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
Thank you both for the efforts. The progress sounds great. JFYI, there are still a few other blocker issues, and based on the progress of resolving them my estimation would be creating RC1 around next Friday. So for this feature there's no need to rush. Take your time for code reviewing and testing. As for the migration tool, I agree it's a bigger scope issue, and should not block the release. Actually, what I have in mind is something independent from any Flink version, and may serve for general incompatible state migration. Best, Xintong On Thu, Feb 27, 2025 at 4:16 AM Kurt Ostfeld wrote: > A state migration tool that can migrate savepoints/checkpoints is a great > idea that would be very useful for a lot of scenarios. But that is a bigger > scope issue. > > (fyi, CI is passing and PR should be fully ready) >
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
Sry for the late reply, also +1 to have this in 2.0 given that we don't guarantee backwards compatibility, and it is already not compatible in many ways. Looking forward to this. Best, Zakelly On Thu, Feb 27, 2025 at 9:20 AM Xintong Song wrote: > Thank you both for the efforts. The progress sounds great. > > JFYI, there are still a few other blocker issues, and based on the progress > of resolving them my estimation would be creating RC1 around next Friday. > So for this feature there's no need to rush. Take your time for code > reviewing and testing. > > As for the migration tool, I agree it's a bigger scope issue, and should > not block the release. Actually, what I have in mind is something > independent from any Flink version, and may serve for general incompatible > state migration. > > Best, > > Xintong > > > > On Thu, Feb 27, 2025 at 4:16 AM Kurt Ostfeld > > wrote: > > > A state migration tool that can migrate savepoints/checkpoints is a great > > idea that would be very useful for a lot of scenarios. But that is a > bigger > > scope issue. > > > > (fyi, CI is passing and PR should be fully ready) > > >
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
The only reason to keep the old version of Kryo and not either upgrade or remove Kryo is if there is a backward compatibility advantage. If backward compatibility is breaking anyway, this seems an easy choice. Java record support was the big motivation to this upgrade. From some simple tests I've done, the Flink 2.0 branch code seems to work with Java records where Flink 1.x versions did not. The Flink 1.x code was automatically using Kryo, and the old Kryo fails with Java records, the Flink 2.0 code doesn't seem to use Kryo by default. However, you still don't want to pull a super old version of the Kryo library into the project. The Kryo code is still used widely throughout the Flink code base. The old Kryo 2.x is fundamentally incompatible with Java records and will fail if it gets called with a Java record or some other collection/class that has a nested reference to a Java record. Either none of that Kryo code is actively being used, in which case, it should be removed, or it is being used and will fail when used with Java records.
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
A state migration tool that can migrate savepoints/checkpoints is a great idea that would be very useful for a lot of scenarios. But that is a bigger scope issue. (fyi, CI is passing and PR should be fully ready)
[jira] [Created] (FLINK-37386) Avoid sending too much CreateTableEvents when job failover
Yanquan Lv created FLINK-37386: -- Summary: Avoid sending too much CreateTableEvents when job failover Key: FLINK-37386 URL: https://issues.apache.org/jira/browse/FLINK-37386 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.3.0 Reporter: Yanquan Lv Fix For: cdc-3.4.0 At present, MySQL Source queries the table structure information of all tables at the beginning of job startup. For cases starting from binlog or job failure, it sends all CreateTableEvents downstream. However, this often leads to downstream processing of these events, which may even cause binlog link disconnection. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37392) Kubernetes Operator UpgradeFailureException: HA metadata not available to restore from last state
junzhong qin created FLINK-37392: Summary: Kubernetes Operator UpgradeFailureException: HA metadata not available to restore from last state Key: FLINK-37392 URL: https://issues.apache.org/jira/browse/FLINK-37392 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: junzhong qin I run a bouned stream sql on Kubernetes. And i set {code:java} kubernetes.operator.jm-deployment.shutdown-ttl: 5 m {code} When the job exit, the operator log always show {code:java} o.a.f.k.o.c.FlinkDeploymentController [ERROR][{ns}/{cluster-id}] Error while upgrading Flink Deployment org.apache.flink.kubernetes.operator.exception.UpgradeFailureException: HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:946) at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:213) at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:183) at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:64) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:387) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.resubmitJob(AbstractJobReconciler.java:555) at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.reconcileOtherChanges(ApplicationReconciler.java:294) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:173) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:152) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:61) at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153) at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111) at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:110) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:136) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:117) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64) at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:452) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) {code} Can delete the HA in Operator when `kubernetes.operator.jm-deployment.shutdown-ttl` reached? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37390) Push filter into ChangelogNormalize
Dawid Wysakowicz created FLINK-37390: Summary: Push filter into ChangelogNormalize Key: FLINK-37390 URL: https://issues.apache.org/jira/browse/FLINK-37390 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 2.1.0 We can optimize pipelines with ChangelogNormalize present if we push filters into the op erator itself. That way we can # reduce the number of records emitted downstream that will be filtered out right after the ChangelogNormalize # drop state for keys that currently do not match the filter Example: ``` // filter value < 10 // input records +U[key=A, value=1] +U[key=A, value=2] +U[key=A, value=12] +U[key=A, value=13] +U[key=A, value=15] -D[key=A, value=15] // records emitted without filter pushed into ChangelogNormalize +I[key=A, value=1] -U[key=A, value=1] +U[key=A, value=2] -U[key=A, value=2] +U[key=A, value=12] // filtered out in the next operator -U[key=A, value=12] // filtered out in the next operator +U[key=A, value=13] // filtered out in the next operator -U[key=A, value=13] // filtered out in the next operator +U[key=A, value=15] // filtered out in the next operator -D[key=A, value=15] // filtered out in the next operator // records emitted with filter pushed into ChangelogNormalize +I[key=A, value=1] -U[key=A, value=1] +U[key=A, value=2] -D[key=A, value=2] // we clear the state for key=A ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37389) Add "flink-sql-connector-kudu" module
Ferenc Csaky created FLINK-37389: Summary: Add "flink-sql-connector-kudu" module Key: FLINK-37389 URL: https://issues.apache.org/jira/browse/FLINK-37389 Project: Flink Issue Type: Sub-task Components: Connectors / Kudu Reporter: Ferenc Csaky Assignee: Ferenc Csaky Fix For: kudu-2.0.0 The original project had no separate fat JAR for the SQL connector, which should be added for easier usage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37388) [BUG] Field names must be unique.
zexin gong created FLINK-37388: -- Summary: [BUG] Field names must be unique. Key: FLINK-37388 URL: https://issues.apache.org/jira/browse/FLINK-37388 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: 3.0.0 Reporter: zexin gong Attachments: screenshot-20250226-171451.png If configured of schema.change.behavior: EVOLVE, an exception may be triggered when reading binlog split: Field names must be unique. Found duplicates: [xxx] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37387) License check behaves differently for different jdk versions
Sergey Nuyanzin created FLINK-37387: --- Summary: License check behaves differently for different jdk versions Key: FLINK-37387 URL: https://issues.apache.org/jira/browse/FLINK-37387 Project: Flink Issue Type: Bug Components: Build System Reporter: Sergey Nuyanzin example jdk 17 ok: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=66272&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=1 and same ci jdk11 (fails): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=66272&view=logs&j=946871de-358d-5815-3994-8175615bc253&t=e0240c62-4570-5d1c-51af-dd63d2093da1&l=41112 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[SUMMARY] Flink 2.0 Release Sync 02/26/2025
Hi devs, Here's the summary of today's release sync. - All documentation tickets have been closed. - 27 out of 30 cross-team verification tickets have been closed. We are targeting to complete them all by this weekend. - There's 1 testability blocker, FLINK-35810, which we need help with. - There's 1 non-testing blocker, which is being looked into. - I'm reaching out to contributors of the core features for preparing the release announcement. Should come up with a first draft next week. - We are targeting to create RC1 by the end of next week, if all blockers can be resolved. The next release sync will be held on Mar 5. Best, Xintong
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
Thanks for bringing this to our attention! I would choose simplicity over backward compatibility given Flink 2.0 offers the opportunity for breaking changes. We will benefit from it from long-term's perspective. +1 for upgrading Kryo in Flink 2.0 in a non compatible way. Best regards, Jing On Wed, Feb 26, 2025 at 5:37 AM Nick Nezis wrote: > Thanks Martijn. > > That's really great context. In that case, then I'll change my previous > opinion. I agree that we should proceed with the simpler pull request and > get it into the Flink 2.0 release. > > On 2025/02/25 14:06:20 Martijn Visser wrote: > > Hi all, > > > > For the record, I don't think we have a guarantee around backwards > > compatibility for Flink 2.0 anyway, given that we upgraded Scala to the > > latest version (because of the bump to JDK 17) and that will potentially > > break savepoints when using Scala. So I think we should also put this in > > for Flink 2.0, and just have the right release notes/documentation for > this. > > > > Best regards, > > > > Martijn > > > > On Tue, Feb 25, 2025 at 3:31 AM Zhanghao Chen > > > wrote: > > > > > Hi Gyula, > > > > > > Thanks for bringing this up! Definitely +1 for upgrading Kryo in Flink > > > 2.0. As a side note, it might be useful to introduce customizable > generic > > > serializer support like Spark, where you can switch to your own > serializer > > > via the "spark.serializer" [1] option. Users starting new applications > can > > > introduce their own serialization stack in this case to resolve Java > > > compatibility issue is this case or for other performance issues. > > > > > > [1] https://spark.apache.org/docs/latest/configuration.html > > > > > > > > > Best, > > > Zhanghao Chen > > > > > > From: Gyula F?ra > > > Sent: Friday, February 21, 2025 14:04 > > > To: dev > > > Subject: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0 > > > > > > Hey all! > > > > > > I would like to rekindle this discussion as it seems that it has > stalled > > > several times in the past and we are nearing the point in time where > the > > > decision has to be made with regards to 2.0. (we are already a bit > late but > > > nevermind) > > > > > > There has been numerous requests and efforts to upgrade Kryo to better > > > support newer Java versions and Java native types. I think we can all > agree > > > that this change is inevitable one way or another. > > > > > > The latest JIRA for this seems to be: > > > https://issues.apache.org/jira/browse/FLINK-3154 > > > > > > There is even an open PR that accomplishes this (currently in a state > > > incompatible way) but based on the discussion it seems that with some > extra > > > complexity compatibility can even be preserved by having both the old > and > > > new Kryo versions active at the same time. > > > > > > The main question here is whether state compatibility is important for > 2.0 > > > with this regard or we want to bite the bullet and make this upgrade > once > > > and for all. > > > > > > Cheers, > > > Gyula > > > > > >
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
Thank you all for your feedback. Let's leave this open for another day and unless there is any negative feedback we can go ahead with merging the PR to bump the version for 2.0 Cheers Gyula On Wed, Feb 26, 2025 at 10:56 AM Jing Ge wrote: > Thanks for bringing this to our attention! I would choose simplicity over > backward compatibility given Flink 2.0 offers the opportunity for breaking > changes. We will benefit from it from long-term's perspective. +1 for > upgrading Kryo in Flink 2.0 in a non compatible way. > > Best regards, > Jing > > On Wed, Feb 26, 2025 at 5:37 AM Nick Nezis wrote: > > > Thanks Martijn. > > > > That's really great context. In that case, then I'll change my previous > > opinion. I agree that we should proceed with the simpler pull request and > > get it into the Flink 2.0 release. > > > > On 2025/02/25 14:06:20 Martijn Visser wrote: > > > Hi all, > > > > > > For the record, I don't think we have a guarantee around backwards > > > compatibility for Flink 2.0 anyway, given that we upgraded Scala to the > > > latest version (because of the bump to JDK 17) and that will > potentially > > > break savepoints when using Scala. So I think we should also put this > in > > > for Flink 2.0, and just have the right release notes/documentation for > > this. > > > > > > Best regards, > > > > > > Martijn > > > > > > On Tue, Feb 25, 2025 at 3:31 AM Zhanghao Chen < > zhanghao.c...@outlook.com > > > > > > wrote: > > > > > > > Hi Gyula, > > > > > > > > Thanks for bringing this up! Definitely +1 for upgrading Kryo in > Flink > > > > 2.0. As a side note, it might be useful to introduce customizable > > generic > > > > serializer support like Spark, where you can switch to your own > > serializer > > > > via the "spark.serializer" [1] option. Users starting new > applications > > can > > > > introduce their own serialization stack in this case to resolve Java > > > > compatibility issue is this case or for other performance issues. > > > > > > > > [1] https://spark.apache.org/docs/latest/configuration.html > > > > > > > > > > > > Best, > > > > Zhanghao Chen > > > > > > > > From: Gyula F?ra > > > > Sent: Friday, February 21, 2025 14:04 > > > > To: dev > > > > Subject: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0 > > > > > > > > Hey all! > > > > > > > > I would like to rekindle this discussion as it seems that it has > > stalled > > > > several times in the past and we are nearing the point in time where > > the > > > > decision has to be made with regards to 2.0. (we are already a bit > > late but > > > > nevermind) > > > > > > > > There has been numerous requests and efforts to upgrade Kryo to > better > > > > support newer Java versions and Java native types. I think we can all > > agree > > > > that this change is inevitable one way or another. > > > > > > > > The latest JIRA for this seems to be: > > > > https://issues.apache.org/jira/browse/FLINK-3154 > > > > > > > > There is even an open PR that accomplishes this (currently in a state > > > > incompatible way) but based on the discussion it seems that with some > > extra > > > > complexity compatibility can even be preserved by having both the old > > and > > > > new Kryo versions active at the same time. > > > > > > > > The main question here is whether state compatibility is important > for > > 2.0 > > > > with this regard or we want to bite the bullet and make this upgrade > > once > > > > and for all. > > > > > > > > Cheers, > > > > Gyula > > > > > > > > > >
[jira] [Created] (FLINK-37391) Change watchNamespace config in operator is not respected while running
Fabian Paul created FLINK-37391: --- Summary: Change watchNamespace config in operator is not respected while running Key: FLINK-37391 URL: https://issues.apache.org/jira/browse/FLINK-37391 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: 1.19.0, 1.10.0 Reporter: Fabian Paul The Flink operator allows the limiting of the watched namespaces by setting the `watchNamespaces` config during helm installation and upgrades. Currently, the configuration is only loaded into operator during the initial start [https://github.com/apache/flink-kubernetes-operator/blob/9eb3c385b90a5a2f08376720f3204d1784981a0c/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java#L247] leading to that updating the watched namespaces doesn't inform the java process and only create the necessary k8s resources, e.g., service account via the helm upgrade. One idea could be that changing watchNamespaces triggers a restart of the operator pod and, hence reinitializes the watch behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37393) Abnormally Long Checkpoint Duration After Full Checkpoint Completion
Rafael Zimmermann created FLINK-37393: - Summary: Abnormally Long Checkpoint Duration After Full Checkpoint Completion Key: FLINK-37393 URL: https://issues.apache.org/jira/browse/FLINK-37393 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.18.0 Environment: Apache Flink 1.18.0 Pipeline processing ~5.7TB of checkpointed data Using GCS for checkpoint storage Reporter: Rafael Zimmermann Attachments: Screenshot 2025-02-03 at 1.52.09 PM.png, Screenshot 2025-02-03 at 1.52.23 PM.png, Screenshot 2025-02-03 at 1.52.30 PM.png, evidence.log We're observing an issue where checkpoints following a full checkpoint take an unusually long time to complete (1-2 hours) in our Flink pipeline, while normal checkpoints typically complete within seconds/minutes. ### Observed Behavior: - After a full checkpoint completes, the next incremental checkpoint shows extremely high start delay - Normal checkpoints take ~30 seconds to complete - Full checkpoints take ~7 minutes to complete - The problematic checkpoint after full checkpoint takes 1-2 hours - The start delay appears to be the main contributing factor to the long duration ### Logs and Evidence: Full checkpoint logs showing significant gaps: {code:java} {"instant":{"epochSecond":1738600670,"nanoOfSecond":60300},"thread":"flink-pekko.actor.default-dispatcher-18","level":"INFO","loggerName":"org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler","message":"Triggering a checkpoint for job 36762c0137f9ed6a9d5e9ce3dc933871."} ... {"instant":{"epochSecond":1738600713,"nanoOfSecond":26600},"thread":"Checkpoint Timer","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.CheckpointRequestDecider","message":"checkpoint request time in queue: 42663"}{code} ### Impact: - Affects pipeline reliability - No data loss observed, but creates operational concerns ### Potential Causes: - Possible locking mechanism bug in Flink internals - Issues with queued checkpoint requests - Interaction between full and incremental checkpoint scheduling ### Attempted Workarounds: - Manually triggering checkpoints - Normal checkpoint operations resume after several hours ### Questions: Is this expected behavior when full and incremental checkpoints interact? Are there known issues with checkpoint request queuing? Are there configuration parameters that could help mitigate this? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
Hi Xintong! The PR is basically ready and CI almost passes (e2es work but there were some minor checkstyle and related errors). I will go through it and also test it manually a little today / tomorrow. Unless something very unexpected comes up during the review and testing I would expect to be able to merge this by Friday to the master. Regarding the compatibility/migration tooling, I think this could be possible based on what I have read so far but I am not completely sure how the 2 different Kryo versions will interplay in the tooling. Maybe Nick or Kurt can chime in here but I imagine we could move the backward compatibility logic into this tool that was originally built into the Kryo upgrade PR directly. Cheers Gyula On Wed, Feb 26, 2025 at 1:07 PM Xintong Song wrote: > Hi Gyula, > > Sorry for chime in late, just noticed the thread. And thanks for bringing > this up. > > As one of the release managers, +1 for including this in Flink 2.0. > >- Flink 2.0 does not guarantee state compatibility. We've mentioned that >in the 2.0-preview release announcement. In addition to the Scala > upgrading >as Martijn mentioned, we also introduced built-in serializer for >collections types as default, which is also compatibility breaking. >- Although the feature freeze date has already passed, I think there's a >good reason to make this effort an exception. We've been preparing this >release for ~2 years. Why not wait for another 1-2 weeks, if that really >helps? > > IIRC, this has come up somewhere when we plan the release, and was not > included only because no one was driving the effort. So it's really nice to > see it being picked up. > > Just for managing the expectation, do we have an estimation on how long > this may take? > > A related topic is, can we provide a state migration tool to help users > migrate across incompatible checkpoints / savepoints? I.e., deserialize the > states into in-memory objects with old serialization setups, and serialize > them with new setups. Such a tool can be helpful not only in a > state-incompatible major version bump, but also in scenarios such as: > >- The user wants to change the serialization setups >- State incompatibility due to user logic change, which the user knows >how to deal with but lacks methods to do it. E.g., adding a new field to >the state data type, where a certain default value can be applied to the >previous states. > > I'm not entirely sure whether this can work or not. Just lack the capacity > to look into it. > > Best, > > Xintong > > > > On Wed, Feb 26, 2025 at 6:02 PM Gyula Fóra wrote: > > > Thank you all for your feedback. > > > > Let's leave this open for another day and unless there is any negative > > feedback we can go ahead with merging the PR to bump the version for 2.0 > > > > Cheers > > Gyula > > > > On Wed, Feb 26, 2025 at 10:56 AM Jing Ge > > wrote: > > > > > Thanks for bringing this to our attention! I would choose simplicity > over > > > backward compatibility given Flink 2.0 offers the opportunity for > > breaking > > > changes. We will benefit from it from long-term's perspective. +1 for > > > upgrading Kryo in Flink 2.0 in a non compatible way. > > > > > > Best regards, > > > Jing > > > > > > On Wed, Feb 26, 2025 at 5:37 AM Nick Nezis > wrote: > > > > > > > Thanks Martijn. > > > > > > > > That's really great context. In that case, then I'll change my > previous > > > > opinion. I agree that we should proceed with the simpler pull request > > and > > > > get it into the Flink 2.0 release. > > > > > > > > On 2025/02/25 14:06:20 Martijn Visser wrote: > > > > > Hi all, > > > > > > > > > > For the record, I don't think we have a guarantee around backwards > > > > > compatibility for Flink 2.0 anyway, given that we upgraded Scala to > > the > > > > > latest version (because of the bump to JDK 17) and that will > > > potentially > > > > > break savepoints when using Scala. So I think we should also put > this > > > in > > > > > for Flink 2.0, and just have the right release notes/documentation > > for > > > > this. > > > > > > > > > > Best regards, > > > > > > > > > > Martijn > > > > > > > > > > On Tue, Feb 25, 2025 at 3:31 AM Zhanghao Chen < > > > zhanghao.c...@outlook.com > > > > > > > > > > wrote: > > > > > > > > > > > Hi Gyula, > > > > > > > > > > > > Thanks for bringing this up! Definitely +1 for upgrading Kryo in > > > Flink > > > > > > 2.0. As a side note, it might be useful to introduce customizable > > > > generic > > > > > > serializer support like Spark, where you can switch to your own > > > > serializer > > > > > > via the "spark.serializer" [1] option. Users starting new > > > applications > > > > can > > > > > > introduce their own serialization stack in this case to resolve > > Java > > > > > > compatibility issue is this case or for other performance issues. > > > > > > > > > > > > [1] https://spark.apache.org/docs/latest/configuration.html > > > > > > > > > > > > >
Re: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0
Hi Gyula, Sorry for chime in late, just noticed the thread. And thanks for bringing this up. As one of the release managers, +1 for including this in Flink 2.0. - Flink 2.0 does not guarantee state compatibility. We've mentioned that in the 2.0-preview release announcement. In addition to the Scala upgrading as Martijn mentioned, we also introduced built-in serializer for collections types as default, which is also compatibility breaking. - Although the feature freeze date has already passed, I think there's a good reason to make this effort an exception. We've been preparing this release for ~2 years. Why not wait for another 1-2 weeks, if that really helps? IIRC, this has come up somewhere when we plan the release, and was not included only because no one was driving the effort. So it's really nice to see it being picked up. Just for managing the expectation, do we have an estimation on how long this may take? A related topic is, can we provide a state migration tool to help users migrate across incompatible checkpoints / savepoints? I.e., deserialize the states into in-memory objects with old serialization setups, and serialize them with new setups. Such a tool can be helpful not only in a state-incompatible major version bump, but also in scenarios such as: - The user wants to change the serialization setups - State incompatibility due to user logic change, which the user knows how to deal with but lacks methods to do it. E.g., adding a new field to the state data type, where a certain default value can be applied to the previous states. I'm not entirely sure whether this can work or not. Just lack the capacity to look into it. Best, Xintong On Wed, Feb 26, 2025 at 6:02 PM Gyula Fóra wrote: > Thank you all for your feedback. > > Let's leave this open for another day and unless there is any negative > feedback we can go ahead with merging the PR to bump the version for 2.0 > > Cheers > Gyula > > On Wed, Feb 26, 2025 at 10:56 AM Jing Ge > wrote: > > > Thanks for bringing this to our attention! I would choose simplicity over > > backward compatibility given Flink 2.0 offers the opportunity for > breaking > > changes. We will benefit from it from long-term's perspective. +1 for > > upgrading Kryo in Flink 2.0 in a non compatible way. > > > > Best regards, > > Jing > > > > On Wed, Feb 26, 2025 at 5:37 AM Nick Nezis wrote: > > > > > Thanks Martijn. > > > > > > That's really great context. In that case, then I'll change my previous > > > opinion. I agree that we should proceed with the simpler pull request > and > > > get it into the Flink 2.0 release. > > > > > > On 2025/02/25 14:06:20 Martijn Visser wrote: > > > > Hi all, > > > > > > > > For the record, I don't think we have a guarantee around backwards > > > > compatibility for Flink 2.0 anyway, given that we upgraded Scala to > the > > > > latest version (because of the bump to JDK 17) and that will > > potentially > > > > break savepoints when using Scala. So I think we should also put this > > in > > > > for Flink 2.0, and just have the right release notes/documentation > for > > > this. > > > > > > > > Best regards, > > > > > > > > Martijn > > > > > > > > On Tue, Feb 25, 2025 at 3:31 AM Zhanghao Chen < > > zhanghao.c...@outlook.com > > > > > > > > wrote: > > > > > > > > > Hi Gyula, > > > > > > > > > > Thanks for bringing this up! Definitely +1 for upgrading Kryo in > > Flink > > > > > 2.0. As a side note, it might be useful to introduce customizable > > > generic > > > > > serializer support like Spark, where you can switch to your own > > > serializer > > > > > via the "spark.serializer" [1] option. Users starting new > > applications > > > can > > > > > introduce their own serialization stack in this case to resolve > Java > > > > > compatibility issue is this case or for other performance issues. > > > > > > > > > > [1] https://spark.apache.org/docs/latest/configuration.html > > > > > > > > > > > > > > > Best, > > > > > Zhanghao Chen > > > > > > > > > > From: Gyula F?ra > > > > > Sent: Friday, February 21, 2025 14:04 > > > > > To: dev > > > > > Subject: [DISCUSSION] Upgrade to Kryo 5 for Flink 2.0 > > > > > > > > > > Hey all! > > > > > > > > > > I would like to rekindle this discussion as it seems that it has > > > stalled > > > > > several times in the past and we are nearing the point in time > where > > > the > > > > > decision has to be made with regards to 2.0. (we are already a bit > > > late but > > > > > nevermind) > > > > > > > > > > There has been numerous requests and efforts to upgrade Kryo to > > better > > > > > support newer Java versions and Java native types. I think we can > all > > > agree > > > > > that this change is inevitable one way or another. > > > > > > > > > > The latest JIRA for this seems to be: > > > > > https://issues.apache.org/jira/browse/FLINK-3154 > > > > > > > > > > There is even an open PR that accomplishes this (currently
Re: [jira] [Created] (FLINK-28897) Fail to use udf in added jar when enabling checkpoint
Hi Team, Here is the summary on the outcomes from the PR review: Externally added jars are resolved using the FlinkUserCodeClassLoader(child classloader) in flink. This fix involved updating the class loader at the graph execution level with the user code class loader which is expected as per naming convention of the variable here[1] in the source code. However, the issue is already fixed in a safe tested way in Flink 2.0 as part of this PR[2]. If this issue exists for several Flink versions (from 1.16) then we should not introduce large and risky changes in a patch version to fix it. Since the issue has existed since version 1.16 (over 2 years) and is only relevant for the Table API the risk is not worth it [3]. So we can conclude that ADD JAR capability with Table API will be a known limitation till version 20 and the same can be used with Flink 2.0. Based on this I believe we are good to close the PR[4] and tag the fix version for the related issue[5] as 2.0. However I’d love to hear your thoughts on this. Please let me know if there are any suggestions or concerns. [1] https://github.com/apache/flink/blob/a4563caa7a4914dfd9fa5d488f5b2e541ecc582a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L2472 [2] https://github.com/apache/flink/pull/25472 [3] https://github.com/apache/flink/pull/25656#issuecomment-2582320914 [4] https://github.com/apache/flink/pull/25656 [5] https://issues.apache.org/jira/browse/FLINK-28897 Regards, Ammu > On 29 Nov 2024, at 11:49 AM, Ammu P wrote: > > Hi Team, > > I have raised a PR (https://github.com/apache/flink/pull/25656) to 1.20 > version with probable fix for this issue. Can I get a review done for this > please. Thanks in advance. > > Regards, > Ammu Parvathy > On 2022/08/10 03:39:00 "Liu (Jira)" wrote: > > Liu created FLINK-28897: > > --- > > > > Summary: Fail to use udf in added jar when enabling checkpoint > > Key: FLINK-28897 > > URL: https://issues.apache.org/jira/browse/FLINK-28897 > > Project: Flink > > Issue Type: Bug > > Affects Versions: 1.16.0 > > Reporter: Liu > > > > > > > > > > > > > > -- > > This message was sent by Atlassian Jira > > (v8.20.10#820010) > >