[ https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-35088: ----------------------------------- Labels: pull-request-available (was: ) > watermark alignment maxAllowedWatermarkDrift and updateInterval param need > check > -------------------------------------------------------------------------------- > > Key: FLINK-35088 > URL: https://issues.apache.org/jira/browse/FLINK-35088 > Project: Flink > Issue Type: Improvement > Components: API / Core, Runtime / Coordination > Affects Versions: 1.16.1 > Reporter: elon_X > Assignee: elon_X > Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-11-20-12-29-951.png > > > When I use watermark alignment, > 1.I found that setting maxAllowedWatermarkDrift to a negative number > initially led me to believe it could support delaying the consumption of the > source, so I tried it. Then, the upstream data flow would hang indefinitely. > Root cause: > {code:java} > long maxAllowedWatermark = globalCombinedWatermark.getTimestamp() > + watermarkAlignmentParams.getMaxAllowedWatermarkDrift(); {code} > If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark > < lastEmittedWatermark, then the SourceReader will be blocked indefinitely > and cannot recover. > I'm not sure if this is a supported feature of watermark alignment. If it's > not, I think an additional parameter validation should be implemented to > throw an exception on the client side if the value is negative. > 2.The updateInterval parameter also lacks validation. If I set it to 0, the > task will throw an exception when starting the job manager. The JDK class > java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and > throws the exception. > {code:java} > java.lang.IllegalArgumentException: null > at > java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) > ~[?:1.8.0_351] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:191) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:59) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:42) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > [?:1.8.0_351] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_351] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_351] > at java.lang.Thread.run(Thread.java:750) [?:1.8.0_351]{code} > Therefore, I believe it's necessary to validate these two parameters to > ensure that exceptions are thrown on the client side to alert the user. > !image-2024-04-11-20-12-29-951.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)