[jira] [Closed] (FLINK-23006) issue in configuring History server
[ https://issues.apache.org/jira/browse/FLINK-23006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-23006. - Resolution: Not A Problem > issue in configuring History server > > > Key: FLINK-23006 > URL: https://issues.apache.org/jira/browse/FLINK-23006 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.1 >Reporter: Bhagi >Priority: Minor > Attachments: flink-history-server-7fb6f9b66-bkhfm.log, > history-server-deployment.yaml > > > Dear Team, > I have flink session cluster with HA on standalone kubernetes. I want to > configure History server for this flink cluster.. > to configure the history server, in the Flink cluster name space i created 1 > more deployment for history server by passing args: ["history-server"], but > pod is in carshloopBackoff. > can you see the logs of this and help me to configure history server. > kubectl get po -n flink-load-test > NAME READY STATUS RESTARTS > AGE > flink-history-server-7fb6f9b66-bkhfm 0/1 CrashLoopBackOff 10 > 29m > flink-jobmanager-7df744b455-f575q 1/1 Running0 > 18h > flink-jobmanager-7df744b455-qm6wf 1/1 Running0 > 18h > flink-jobmanager-7df744b455-t7zm5 1/1 Running1 > 18h > flink-taskmanager-7945f6b856-89mr5 1/1 Running7 > 18h > flink-taskmanager-7945f6b856-mz2hz 1/1 Running7 > 18h -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23006) issue in configuring History server
[ https://issues.apache.org/jira/browse/FLINK-23006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-23006: -- Priority: Minor (was: Blocker) > issue in configuring History server > > > Key: FLINK-23006 > URL: https://issues.apache.org/jira/browse/FLINK-23006 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.1 >Reporter: Bhagi >Priority: Minor > Attachments: flink-history-server-7fb6f9b66-bkhfm.log, > history-server-deployment.yaml > > > Dear Team, > I have flink session cluster with HA on standalone kubernetes. I want to > configure History server for this flink cluster.. > to configure the history server, in the Flink cluster name space i created 1 > more deployment for history server by passing args: ["history-server"], but > pod is in carshloopBackoff. > can you see the logs of this and help me to configure history server. > kubectl get po -n flink-load-test > NAME READY STATUS RESTARTS > AGE > flink-history-server-7fb6f9b66-bkhfm 0/1 CrashLoopBackOff 10 > 29m > flink-jobmanager-7df744b455-f575q 1/1 Running0 > 18h > flink-jobmanager-7df744b455-qm6wf 1/1 Running0 > 18h > flink-jobmanager-7df744b455-t7zm5 1/1 Running1 > 18h > flink-taskmanager-7945f6b856-89mr5 1/1 Running7 > 18h > flink-taskmanager-7945f6b856-mz2hz 1/1 Running7 > 18h -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23006) issue in configuring History server
[ https://issues.apache.org/jira/browse/FLINK-23006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364730#comment-17364730 ] Till Rohrmann commented on FLINK-23006: --- Are you sure that {{historyserver.web.address: http://flinkhistory:8082}} can be resolved from within the pod? You should rather bind the history server to {{0.0.0.0}}. A general note: Please do not open blocker issues for user questions. Almost all of your tickets are questions about using Flink and not real problems. Most of them even seem to come from not properly understanding Kubernetes. Please ask these questions on the user mailing list instead. > issue in configuring History server > > > Key: FLINK-23006 > URL: https://issues.apache.org/jira/browse/FLINK-23006 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.1 >Reporter: Bhagi >Priority: Blocker > Attachments: flink-history-server-7fb6f9b66-bkhfm.log, > history-server-deployment.yaml > > > Dear Team, > I have flink session cluster with HA on standalone kubernetes. I want to > configure History server for this flink cluster.. > to configure the history server, in the Flink cluster name space i created 1 > more deployment for history server by passing args: ["history-server"], but > pod is in carshloopBackoff. > can you see the logs of this and help me to configure history server. > kubectl get po -n flink-load-test > NAME READY STATUS RESTARTS > AGE > flink-history-server-7fb6f9b66-bkhfm 0/1 CrashLoopBackOff 10 > 29m > flink-jobmanager-7df744b455-f575q 1/1 Running0 > 18h > flink-jobmanager-7df744b455-qm6wf 1/1 Running0 > 18h > flink-jobmanager-7df744b455-t7zm5 1/1 Running1 > 18h > flink-taskmanager-7945f6b856-89mr5 1/1 Running7 > 18h > flink-taskmanager-7945f6b856-mz2hz 1/1 Running7 > 18h -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description
[ https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-22970: --- Assignee: Wei-Che Wei > The documentation for `TO_TIMESTAMP` UDF has an incorrect description > - > > Key: FLINK-22970 > URL: https://issues.apache.org/jira/browse/FLINK-22970 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Labels: pull-request-available > > According to this ML discussion > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html] > The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 > timezone instead of session timezone. We should fix this documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364733#comment-17364733 ] Till Rohrmann commented on FLINK-22483: --- Where else is {{CompletedCheckpointStore::recover}} being called from? I think it is only called by the {{CheckpointCoordinator}} atm. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.ja
[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage
[ https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364734#comment-17364734 ] Robert Metzger commented on FLINK-22257: I'm proposing to extend the {{ConfigOption}} class with a ConfigOptionType (cluster, job), and show this tag in the table we generate for the documentation. > Clarify Flink ConfigOptions Usage > - > > Key: FLINK-22257 > URL: https://issues.apache.org/jira/browse/FLINK-22257 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Fabian Paul >Priority: Minor > > For users, it is hard to determine which ConfigOptions are relevant for the > different stages of a Flink application. > Beginning from the translation of the user program to the execution on the > cluster. In particular which options can be configured through the different > channels. > * Cluster configuration (i.e. flink-conf.yaml) > * Application configuration, code-based > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364736#comment-17364736 ] Yangze Guo commented on FLINK-22891: I'll take another look into it. > FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure > > > Key: FLINK-22891 > URL: https://issues.apache.org/jira/browse/FLINK-22891 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.13.1 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=05b74a19-4ee4-5036-c46f-ada307df6cf0&l=8660 > {code} > Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 6.24 s <<< FAILURE! - in > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase > Jun 05 21:16:00 [ERROR] > testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase) > Time elapsed: 5.015 s <<< ERROR! > Jun 05 21:16:00 java.util.concurrent.TimeoutException > Jun 05 21:16:00 at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > Jun 05 21:16:00 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262) > Jun 05 21:16:00 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Jun 05 21:16:00 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Jun 05 21:16:00 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Jun 05 21:16:00 at java.lang.reflect.Method.invoke(Method.java:498) > Jun 05 21:16:00 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Jun 05 21:16:00 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Jun 05 21:16:00 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Jun 05 21:16:00 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Jun 05 21:16:00 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jun 05 21:16:00 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Jun 05 21:16:00 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Jun 05 21:16:00 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Jun 05 21:16:00 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Jun 05 21:16:00 at > org.junit.runners.ParentRunn
[jira] [Updated] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description
[ https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Che Wei updated FLINK-22970: Affects Version/s: 1.14.0 > The documentation for `TO_TIMESTAMP` UDF has an incorrect description > - > > Key: FLINK-22970 > URL: https://issues.apache.org/jira/browse/FLINK-22970 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Affects Versions: 1.14.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Labels: pull-request-available > > According to this ML discussion > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html] > The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 > timezone instead of session timezone. We should fix this documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23003) Resource leak in RocksIncrementalSnapshotStrategy
[ https://issues.apache.org/jira/browse/FLINK-23003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364740#comment-17364740 ] Roman Khachatryan commented on FLINK-23003: --- Merged into master as 8be1058a60565587b465a2237136d4c168f3. It should also be ported to 1.13 according to the [Update Policy|https://flink.apache.org/downloads.html#update-policy-for-old-releases]. [~Yanfei Lei] would you like to open a PR to backport it to 1.13? > Resource leak in RocksIncrementalSnapshotStrategy > - > > Key: FLINK-23003 > URL: https://issues.apache.org/jira/browse/FLINK-23003 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.14.0, 1.13.1, 1.12.4 > Environment: Flink: 1.14-SNAPSHOT >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0, 1.13.2 > > > We found that `RocksDBStateUploader` in `RocksIncrementalSnapshotStrategy` is > not closed correctly after being used. It would lead to a resource leak. > `RocksDBStateUploader` inherits `RocksDBStateDataTransfer`, and > `RocksDBStateDataTransfer` holds an `ExecutorService`. `RocksDBStateUploader` > uses the `ExecutorService` to upload files to DFS asynchronously. > When `RocksDBKeyedStateBackend` is cleaned up, all resources held by the > backend should be closed, but now `RocksIncrementalSnapshotStrategy` lacks a > close() function. > And we encountered an example caused by this problem. When we benchmarked the > performance of incremental rescaling, we observed that the forked VM of JMH > can't exit normally. > {code:java} > [INFO] > [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ benchmark --- > # JMH version: 1.19 > # VM version: JDK 1.8.0_281, VM 25.281-b09 > # VM invoker: /home/leiyanfei.lyf/jdk1.8.0_281/jre/bin/java > # VM options: -Djava.rmi.server.hostname=127.0.0.1 > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl > # Warmup: 10 iterations, 1 s each > # Measurement: 10 iterations, 1 s each > # Timeout: 10 min per iteration > # Threads: 1 thread, will synchronize iterations > # Benchmark mode: Average time, time/op > # Benchmark: > org.apache.flink.state.RocksIncrementalCheckpointScaleUpBenchmark.ScalingUp > # Parameters: (numberElements = 100, parallelism1 = 2, parallelism2 = 3)# Run > progress: 0.00% complete, ETA 00:02:00 > # Fork: 1 of 3 > # Warmup Iteration 1: 244.717 ms/op > # Warmup Iteration 2: 104.749 ms/op > # Warmup Iteration 3: 104.182 ms/op > ... > Iteration 1: 96.600 ms/op > Iteration 2: 108.463 ms/op > Iteration 3: 93.657 ms/op > ... threads? Waiting 24 seconds more...>Non-finished threads: > ... > Thread[pool-15-thread-4,5,main] > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > The root cause of this example is that the `{{RocksDBStateUploader}}` in > `{{RocksIncrementalSnapshotStrategy`}} is not closed normally when > `{{RocksDBKeyedStateBackend`}} is disposed. > > The solution to this problem is quite straightforward, > `{{RocksDBStateUploader`}} in `{{RocksIncrementalSnapshotStrategy}}` can be > closed when cleaning up `{{RocksDBKeyedStateBackend}}`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-22891: Assignee: Yangze Guo > FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure > > > Key: FLINK-22891 > URL: https://issues.apache.org/jira/browse/FLINK-22891 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.13.1 >Reporter: Dawid Wysakowicz >Assignee: Yangze Guo >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=05b74a19-4ee4-5036-c46f-ada307df6cf0&l=8660 > {code} > Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 6.24 s <<< FAILURE! - in > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase > Jun 05 21:16:00 [ERROR] > testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase) > Time elapsed: 5.015 s <<< ERROR! > Jun 05 21:16:00 java.util.concurrent.TimeoutException > Jun 05 21:16:00 at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > Jun 05 21:16:00 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308) > Jun 05 21:16:00 at > org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262) > Jun 05 21:16:00 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Jun 05 21:16:00 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Jun 05 21:16:00 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Jun 05 21:16:00 at java.lang.reflect.Method.invoke(Method.java:498) > Jun 05 21:16:00 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Jun 05 21:16:00 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Jun 05 21:16:00 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Jun 05 21:16:00 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Jun 05 21:16:00 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jun 05 21:16:00 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Jun 05 21:16:00 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Jun 05 21:16:00 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Jun 05 21:16:00 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Jun 05 21:16:00 at > org.junit.runners.ParentRunner$3.evaluate(Parent
[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage
[ https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364745#comment-17364745 ] Timo Walther commented on FLINK-22257: -- We should include [~AHeise] here as well. This is clearly also an SDK topic that we've started in 2019 but never completed. The prefixes of option names (e.g. `pipeline.`) mostly represent the proposed design but allowing to define additional areas where options apply programmatically is a good idea. > Clarify Flink ConfigOptions Usage > - > > Key: FLINK-22257 > URL: https://issues.apache.org/jira/browse/FLINK-22257 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Fabian Paul >Priority: Minor > > For users, it is hard to determine which ConfigOptions are relevant for the > different stages of a Flink application. > Beginning from the translation of the user program to the execution on the > cluster. In particular which options can be configured through the different > channels. > * Cluster configuration (i.e. flink-conf.yaml) > * Application configuration, code-based > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22995) Failed to acquire lease 'ConfigMapLock: .... retrying...
[ https://issues.apache.org/jira/browse/FLINK-22995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364746#comment-17364746 ] Bhagi commented on FLINK-22995: --- Hi Till, I have 2 flink clusters in different namespace..but they have unique kubernetes.cluster-Id > Failed to acquire lease 'ConfigMapLock: retrying... > > > Key: FLINK-22995 > URL: https://issues.apache.org/jira/browse/FLINK-22995 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.1 >Reporter: Bhagi >Priority: Major > Attachments: jobamanger.log > > > Hi Team, > I have deployed Flink session cluster on standalone k8s with Jobmanager HA > (k8s HA service). > when i am submitting the jobs all jobs are failed. due to jobmanager leader > election issue. > attaching the logs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364747#comment-17364747 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- [~trohrmann] that's right but the {{CheckpointCoordinator}} gets called from multiple places, and those also get called from multiple other places. They all end up invoking {{CompletedCheckpointStore::recover}} in the stack call. Should those stay unchanged? > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.Completable
[jira] [Reopened] (FLINK-22968) Improve exception message when using toAppendStream[String]
[ https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reopened FLINK-22968: -- > Improve exception message when using toAppendStream[String] > --- > > Key: FLINK-22968 > URL: https://issues.apache.org/jira/browse/FLINK-22968 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.3, 1.13.1 > Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color} >Reporter: DaChun >Assignee: Nicholas Jiang >Priority: Major > Attachments: test.scala, this_error.txt > > > {code:scala} > package com.bytedance.one > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import org.apache.flink.table.api.Table > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.api.scala._ > object test { > def main(args: Array[String]): Unit = { > val env: StreamExecutionEnvironment = StreamExecutionEnvironment > .createLocalEnvironmentWithWebUI() > val stream: DataStream[String] = env.readTextFile("data/wc.txt") > val tableEnvironment: StreamTableEnvironment = > StreamTableEnvironment.create(env) > val table: Table = tableEnvironment.fromDataStream(stream) > tableEnvironment.createTemporaryView("wc", table) > val res: Table = tableEnvironment.sqlQuery("select * from wc") > tableEnvironment.toAppendStream[String](res).print() > env.execute("test") > } > } > {code} > When I run the program, the error is as follows: > The specific error is in this_ error.txt, > But the error prompts me to write an issue > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > It's very easy to read a stream and convert it into a table mode. The generic > type is string > Then an error is reported. The code is in test. Scala and the error is in > this_ error.txt > But I try to create an entity class by myself, and then assign generics to > this entity class, and then I can pass it normally, > Or with row type, do you have to create your own entity class, not with > string type? > h3. Summary > We should improve the exception message a bit, we can throw the given type > (String) is not allowed in {{toAppendStream}}. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22968) Improve exception message when using toAppendStream[String]
[ https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-22968. Resolution: Won't Fix Let's not put more effort in deprecated APIs. > Improve exception message when using toAppendStream[String] > --- > > Key: FLINK-22968 > URL: https://issues.apache.org/jira/browse/FLINK-22968 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.3, 1.13.1 > Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color} >Reporter: DaChun >Assignee: Nicholas Jiang >Priority: Major > Attachments: test.scala, this_error.txt > > > {code:scala} > package com.bytedance.one > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import org.apache.flink.table.api.Table > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.api.scala._ > object test { > def main(args: Array[String]): Unit = { > val env: StreamExecutionEnvironment = StreamExecutionEnvironment > .createLocalEnvironmentWithWebUI() > val stream: DataStream[String] = env.readTextFile("data/wc.txt") > val tableEnvironment: StreamTableEnvironment = > StreamTableEnvironment.create(env) > val table: Table = tableEnvironment.fromDataStream(stream) > tableEnvironment.createTemporaryView("wc", table) > val res: Table = tableEnvironment.sqlQuery("select * from wc") > tableEnvironment.toAppendStream[String](res).print() > env.execute("test") > } > } > {code} > When I run the program, the error is as follows: > The specific error is in this_ error.txt, > But the error prompts me to write an issue > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > It's very easy to read a stream and convert it into a table mode. The generic > type is string > Then an error is reported. The code is in test. Scala and the error is in > this_ error.txt > But I try to create an entity class by myself, and then assign generics to > this entity class, and then I can pass it normally, > Or with row type, do you have to create your own entity class, not with > string type? > h3. Summary > We should improve the exception message a bit, we can throw the given type > (String) is not allowed in {{toAppendStream}}. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-22968) Improve exception message when using toAppendStream[String]
[ https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-22968: - Comment: was deleted (was: Let's not put more effort in deprecated APIs. ) > Improve exception message when using toAppendStream[String] > --- > > Key: FLINK-22968 > URL: https://issues.apache.org/jira/browse/FLINK-22968 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.3, 1.13.1 > Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color} >Reporter: DaChun >Assignee: Nicholas Jiang >Priority: Major > Attachments: test.scala, this_error.txt > > > {code:scala} > package com.bytedance.one > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import org.apache.flink.table.api.Table > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.api.scala._ > object test { > def main(args: Array[String]): Unit = { > val env: StreamExecutionEnvironment = StreamExecutionEnvironment > .createLocalEnvironmentWithWebUI() > val stream: DataStream[String] = env.readTextFile("data/wc.txt") > val tableEnvironment: StreamTableEnvironment = > StreamTableEnvironment.create(env) > val table: Table = tableEnvironment.fromDataStream(stream) > tableEnvironment.createTemporaryView("wc", table) > val res: Table = tableEnvironment.sqlQuery("select * from wc") > tableEnvironment.toAppendStream[String](res).print() > env.execute("test") > } > } > {code} > When I run the program, the error is as follows: > The specific error is in this_ error.txt, > But the error prompts me to write an issue > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > It's very easy to read a stream and convert it into a table mode. The generic > type is string > Then an error is reported. The code is in test. Scala and the error is in > this_ error.txt > But I try to create an entity class by myself, and then assign generics to > this entity class, and then I can pass it normally, > Or with row type, do you have to create your own entity class, not with > string type? > h3. Summary > We should improve the exception message a bit, we can throw the given type > (String) is not allowed in {{toAppendStream}}. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22326) Job contains Iterate Operator always fails on Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-22326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364750#comment-17364750 ] Dawid Wysakowicz commented on FLINK-22326: -- I think updating the documentation is a good idea. If you like to help with that I can review the change. {{tryYield}} is not a good solution as well, as it basically leads to busy looping. We'd have to change the queue to one that gives us a Future, which tells if a record is available. That way we could suspend the default action of the mailbox processor. > Job contains Iterate Operator always fails on Checkpoint > - > > Key: FLINK-22326 > URL: https://issues.apache.org/jira/browse/FLINK-22326 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.1 >Reporter: Lu Niu >Assignee: Lu Niu >Priority: Major > Labels: pull-request-available > Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot > 2021-04-16 at 12.43.38 PM.png > > > Job contains Iterate Operator will always fail on checkpoint. > How to reproduce: > [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785] > this is based on > [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,] > but a few line difference: > 1. Make maxWaitTime large enough when create IterativeStream > 2. No output back to Itertive Source > Result: > The same code is able to checkpoint in 1.9.1 > !Screen Shot 2021-04-16 at 12.43.38 PM.png! > > but always fail on checkpoint in 1.11 > !Screen Shot 2021-04-16 at 12.40.34 PM.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22968) Improve exception message when using toAppendStream[String]
[ https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-22968. Resolution: Won't Fix Let's not put more effort into deprecated APIs. The 1.13 documentation clearly states that these API parts are legacy. I will put a deprecation over the affected methods in 1.14. If this pops up again, we can think about fixing it. > Improve exception message when using toAppendStream[String] > --- > > Key: FLINK-22968 > URL: https://issues.apache.org/jira/browse/FLINK-22968 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.3, 1.13.1 > Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color} >Reporter: DaChun >Assignee: Nicholas Jiang >Priority: Major > Attachments: test.scala, this_error.txt > > > {code:scala} > package com.bytedance.one > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import org.apache.flink.table.api.Table > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.api.scala._ > object test { > def main(args: Array[String]): Unit = { > val env: StreamExecutionEnvironment = StreamExecutionEnvironment > .createLocalEnvironmentWithWebUI() > val stream: DataStream[String] = env.readTextFile("data/wc.txt") > val tableEnvironment: StreamTableEnvironment = > StreamTableEnvironment.create(env) > val table: Table = tableEnvironment.fromDataStream(stream) > tableEnvironment.createTemporaryView("wc", table) > val res: Table = tableEnvironment.sqlQuery("select * from wc") > tableEnvironment.toAppendStream[String](res).print() > env.execute("test") > } > } > {code} > When I run the program, the error is as follows: > The specific error is in this_ error.txt, > But the error prompts me to write an issue > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > It's very easy to read a stream and convert it into a table mode. The generic > type is string > Then an error is reported. The code is in test. Scala and the error is in > this_ error.txt > But I try to create an entity class by myself, and then assign generics to > this entity class, and then I can pass it normally, > Or with row type, do you have to create your own entity class, not with > string type? > h3. Summary > We should improve the exception message a bit, we can throw the given type > (String) is not allowed in {{toAppendStream}}. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22326) Job contains Iterate Operator always fails on Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-22326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364750#comment-17364750 ] Dawid Wysakowicz edited comment on FLINK-22326 at 6/17/21, 7:39 AM: I think updating the documentation is a good idea. If you like to help with that I can review the change. {{tryYield}} is not a good solution as well, as it basically leads to busy looping. We'd have to change the queue to one that gives us a Future, which tells if a record is available. That way we could suspend the default action of the mailbox processor. {{Yield}} in the {{AsyncWaitOperator}} is correct imo, as the only way to put new records into the queue is from processing records via the {{mailboxProcessor}}. In other words there must be a new mail in the mailbox to unblock the queue. was (Author: dawidwys): I think updating the documentation is a good idea. If you like to help with that I can review the change. {{tryYield}} is not a good solution as well, as it basically leads to busy looping. We'd have to change the queue to one that gives us a Future, which tells if a record is available. That way we could suspend the default action of the mailbox processor. > Job contains Iterate Operator always fails on Checkpoint > - > > Key: FLINK-22326 > URL: https://issues.apache.org/jira/browse/FLINK-22326 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.1 >Reporter: Lu Niu >Assignee: Lu Niu >Priority: Major > Labels: pull-request-available > Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot > 2021-04-16 at 12.43.38 PM.png > > > Job contains Iterate Operator will always fail on checkpoint. > How to reproduce: > [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785] > this is based on > [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,] > but a few line difference: > 1. Make maxWaitTime large enough when create IterativeStream > 2. No output back to Itertive Source > Result: > The same code is able to checkpoint in 1.9.1 > !Screen Shot 2021-04-16 at 12.43.38 PM.png! > > but always fail on checkpoint in 1.11 > !Screen Shot 2021-04-16 at 12.40.34 PM.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage
[ https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364753#comment-17364753 ] Robert Metzger commented on FLINK-22257: Besides the {{pipeline.}} options, the restart configuration, and some rocksdb config is probably also a ConfigOptionType.JOB. > Clarify Flink ConfigOptions Usage > - > > Key: FLINK-22257 > URL: https://issues.apache.org/jira/browse/FLINK-22257 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Fabian Paul >Priority: Minor > > For users, it is hard to determine which ConfigOptions are relevant for the > different stages of a Flink application. > Beginning from the translation of the user program to the execution on the > cluster. In particular which options can be configured through the different > channels. > * Cluster configuration (i.e. flink-conf.yaml) > * Application configuration, code-based > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22662) YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail
[ https://issues.apache.org/jira/browse/FLINK-22662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364755#comment-17364755 ] Xintong Song commented on FLINK-22662: -- [~fly_in_gis] and I have looked into this instability. Here are our findings. The expected behaviors of {{testKillYarnSessionClusterEntrypoint}} are as follow: 1) AM_1 running 2) Kill AM 3) Yarn brings up AM_2 4) Signal the job to finish (via temp file) The problem is that, it is possible that 3) & 4) happen before 2) is finished. This is because a Yarn container runs as a process tree, where the Flink java process is brought up by a wrapping {{launch_container.sh}} process. Yarn can detect the termination of AM_1 and start bring up AM_2 as soon as the wrapping process is terminated, while the Flink process might be still running. Consequently, the signal from 4) is received by AM_1 and the job finishes before AM_1 is completely shutdown. When AM_2 is started, there's no job to be recovered, thus the "could not find job" exception. To fix this, we need to make sure AM_1 is completely terminated before proceeding 4). This can be achieved by looking for the PID changes. Besides, a ZK outage is occasionally observed right after the AM failover. Due to absence of ZK logs, we do not find the cause of this outage. However, given that the outage is only observed together with the above described problem, we tend to see them as related. > YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail > > > Key: FLINK-22662 > URL: https://issues.apache.org/jira/browse/FLINK-22662 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.13.0 >Reporter: Guowei Ma >Assignee: Xintong Song >Priority: Major > Labels: test-stability > > {code:java} > 2021-05-14T00:24:57.8487649Z May 14 00:24:57 [ERROR] > testKillYarnSessionClusterEntrypoint(org.apache.flink.yarn.YARNHighAvailabilityITCase) > Time elapsed: 34.667 s <<< ERROR! > 2021-05-14T00:24:57.8488567Z May 14 00:24:57 > java.util.concurrent.ExecutionException: > 2021-05-14T00:24:57.8489301Z May 14 00:24:57 > org.apache.flink.runtime.rest.util.RestClientException: > [org.apache.flink.runtime.rest.handler.RestHandlerException: > org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find > Flink job (610ed4b159ece04c8ee2ec40e7d0c143) > 2021-05-14T00:24:57.8493142Z May 14 00:24:57 at > org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:94) > 2021-05-14T00:24:57.8495823Z May 14 00:24:57 at > org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:84) > 2021-05-14T00:24:57.8496733Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) > 2021-05-14T00:24:57.8497640Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) > 2021-05-14T00:24:57.8498491Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-05-14T00:24:57.8499222Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > 2021-05-14T00:24:57.853Z May 14 00:24:57 at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234) > 2021-05-14T00:24:57.8500872Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2021-05-14T00:24:57.8501702Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2021-05-14T00:24:57.8502662Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-05-14T00:24:57.8503472Z May 14 00:24:57 at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > 2021-05-14T00:24:57.8504269Z May 14 00:24:57 at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079) > 2021-05-14T00:24:57.8504892Z May 14 00:24:57 at > akka.dispatch.OnComplete.internal(Future.scala:263) > 2021-05-14T00:24:57.8505565Z May 14 00:24:57 at > akka.dispatch.OnComplete.internal(Future.scala:261) > 2021-05-14T00:24:57.8506062Z May 14 00:24:57 at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > 2021-05-14T00:24:57.8506819Z May 14 00:24:57 at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > 2021-05-14T00:24:57.8507418Z May 14 00:24:57 at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > 2021-05-14T00:24:57.8508373Z May 14 00:24:5
[jira] [Commented] (FLINK-22420) UnalignedCheckpointITCase failed
[ https://issues.apache.org/jira/browse/FLINK-22420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364758#comment-17364758 ] Yuan Mei commented on FLINK-22420: -- The failure of this task is caused by "Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5" where the additional failure is triggered by "An OperatorEvent from an OperatorCoordinator to a task was lost", as reported. The unaligned checkpoint IT case expected five failures in total: * * After {@code m=1/4*n}, map fails. * After {@code m=1/2*n}, snapshotState fails. * After {@code m=3/4*n}, map fails and the corresponding recovery fails once. * At the end, close fails once. * and it is a bit tricky to introduce potential more failures due to OperatorEvent loss; An easy short-term fix for this is to removes "maxNumberRestartAttempts=5" constraints, with a bit of concern that it may hide other problems. > UnalignedCheckpointITCase failed > > > Key: FLINK-22420 > URL: https://issues.apache.org/jira/browse/FLINK-22420 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.0 >Reporter: Guowei Ma >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17052&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=9442 > {code:java} > Apr 22 14:28:21 at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > Apr 22 14:28:21 at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > Apr 22 14:28:21 at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > Apr 22 14:28:21 at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > Apr 22 14:28:21 at akka.actor.ActorCell.invoke(ActorCell.scala:561) > Apr 22 14:28:21 at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > Apr 22 14:28:21 at akka.dispatch.Mailbox.run(Mailbox.scala:225) > Apr 22 14:28:21 at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Apr 22 14:28:21 Caused by: org.apache.flink.util.FlinkException: An > OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task > failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: > Source: source (1/1) - execution #5 > Apr 22 14:28:21 ... 26 more > Apr 22 14:28:21 > {code} > As described in the comment > https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17326449 > we might need to adjust the tests to allow failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23003) Resource leak in RocksIncrementalSnapshotStrategy
[ https://issues.apache.org/jira/browse/FLINK-23003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364760#comment-17364760 ] Yanfei Lei commented on FLINK-23003: Sure, opened. [https://github.com/apache/flink/pull/16176] [~roman_khachatryan] would you please take a review again? > Resource leak in RocksIncrementalSnapshotStrategy > - > > Key: FLINK-23003 > URL: https://issues.apache.org/jira/browse/FLINK-23003 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.14.0, 1.13.1, 1.12.4 > Environment: Flink: 1.14-SNAPSHOT >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0, 1.13.2 > > > We found that `RocksDBStateUploader` in `RocksIncrementalSnapshotStrategy` is > not closed correctly after being used. It would lead to a resource leak. > `RocksDBStateUploader` inherits `RocksDBStateDataTransfer`, and > `RocksDBStateDataTransfer` holds an `ExecutorService`. `RocksDBStateUploader` > uses the `ExecutorService` to upload files to DFS asynchronously. > When `RocksDBKeyedStateBackend` is cleaned up, all resources held by the > backend should be closed, but now `RocksIncrementalSnapshotStrategy` lacks a > close() function. > And we encountered an example caused by this problem. When we benchmarked the > performance of incremental rescaling, we observed that the forked VM of JMH > can't exit normally. > {code:java} > [INFO] > [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ benchmark --- > # JMH version: 1.19 > # VM version: JDK 1.8.0_281, VM 25.281-b09 > # VM invoker: /home/leiyanfei.lyf/jdk1.8.0_281/jre/bin/java > # VM options: -Djava.rmi.server.hostname=127.0.0.1 > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl > # Warmup: 10 iterations, 1 s each > # Measurement: 10 iterations, 1 s each > # Timeout: 10 min per iteration > # Threads: 1 thread, will synchronize iterations > # Benchmark mode: Average time, time/op > # Benchmark: > org.apache.flink.state.RocksIncrementalCheckpointScaleUpBenchmark.ScalingUp > # Parameters: (numberElements = 100, parallelism1 = 2, parallelism2 = 3)# Run > progress: 0.00% complete, ETA 00:02:00 > # Fork: 1 of 3 > # Warmup Iteration 1: 244.717 ms/op > # Warmup Iteration 2: 104.749 ms/op > # Warmup Iteration 3: 104.182 ms/op > ... > Iteration 1: 96.600 ms/op > Iteration 2: 108.463 ms/op > Iteration 3: 93.657 ms/op > ... threads? Waiting 24 seconds more...>Non-finished threads: > ... > Thread[pool-15-thread-4,5,main] > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > The root cause of this example is that the `{{RocksDBStateUploader}}` in > `{{RocksIncrementalSnapshotStrategy`}} is not closed normally when > `{{RocksDBKeyedStateBackend`}} is disposed. > > The solution to this problem is quite straightforward, > `{{RocksDBStateUploader`}} in `{{RocksIncrementalSnapshotStrategy}}` can be > closed when cleaning up `{{RocksDBKeyedStateBackend}}`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22420) UnalignedCheckpointITCase failed
[ https://issues.apache.org/jira/browse/FLINK-22420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364758#comment-17364758 ] Yuan Mei edited comment on FLINK-22420 at 6/17/21, 7:52 AM: The failure of this task is caused by "Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5" where the additional failure is triggered by "An OperatorEvent from an OperatorCoordinator to a task was lost", as reported. The unaligned checkpoint IT case expected five failures in total: * * After {@code m=1/4*n}, map fails. * After {@code m=1/2*n}, snapshotState fails. * After {@code m=3/4*n}, map fails and the corresponding recovery fails once. * At the end, close fails once. * and it is a bit tricky to introduce potential more failures due to OperatorEvent loss; because the success of this test does rely on how many failures during it running. An easy short-term fix for this as I can think of is to removes "maxNumberRestartAttempts=5" constraints, with a bit of concern that it may hide other problems. was (Author: ym): The failure of this task is caused by "Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5" where the additional failure is triggered by "An OperatorEvent from an OperatorCoordinator to a task was lost", as reported. The unaligned checkpoint IT case expected five failures in total: * * After {@code m=1/4*n}, map fails. * After {@code m=1/2*n}, snapshotState fails. * After {@code m=3/4*n}, map fails and the corresponding recovery fails once. * At the end, close fails once. * and it is a bit tricky to introduce potential more failures due to OperatorEvent loss; An easy short-term fix for this is to removes "maxNumberRestartAttempts=5" constraints, with a bit of concern that it may hide other problems. > UnalignedCheckpointITCase failed > > > Key: FLINK-22420 > URL: https://issues.apache.org/jira/browse/FLINK-22420 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.0 >Reporter: Guowei Ma >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17052&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=9442 > {code:java} > Apr 22 14:28:21 at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > Apr 22 14:28:21 at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > Apr 22 14:28:21 at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > Apr 22 14:28:21 at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > Apr 22 14:28:21 at akka.actor.ActorCell.invoke(ActorCell.scala:561) > Apr 22 14:28:21 at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > Apr 22 14:28:21 at akka.dispatch.Mailbox.run(Mailbox.scala:225) > Apr 22 14:28:21 at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Apr 22 14:28:21 Caused by: org.apache.flink.util.FlinkException: An > OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task > failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: > Source: source (1/1) - execution #5 > Apr 22 14:28:21 ... 26 more > Apr 22 14:28:21 > {code} > As described in the comment > https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17326449 > we might need to adjust the tests to allow failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23013) Introduce faker source connector
[ https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364761#comment-17364761 ] Konstantin Knauf commented on FLINK-23013: -- I'd personally prefer to keep it separate, so that the connector can have an independent lifecycle. What would be the benefit of having it in the project? > Introduce faker source connector > > > Key: FLINK-23013 > URL: https://issues.apache.org/jira/browse/FLINK-23013 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > > We already have datagen connector. > But sometimes, we need a more real datagen connector which can produce more > natural random records. > We can integrate [https://github.com/DiUS/java-faker] and introduce a > built-in faker connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage
[ https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364762#comment-17364762 ] Timo Walther commented on FLINK-22257: -- Maybe but not always. We should think about the layers or teams that usually handle different stages of Flink application lifecycle. Maybe something like CLUSTER, JOB, PIPELINE. But this is just initial idea, it requires deeper investigation. > Clarify Flink ConfigOptions Usage > - > > Key: FLINK-22257 > URL: https://issues.apache.org/jira/browse/FLINK-22257 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Fabian Paul >Priority: Minor > > For users, it is hard to determine which ConfigOptions are relevant for the > different stages of a Flink application. > Beginning from the translation of the user program to the execution on the > cluster. In particular which options can be configured through the different > channels. > * Cluster configuration (i.e. flink-conf.yaml) > * Application configuration, code-based > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22257) Clarify Flink ConfigOptions Usage
[ https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364762#comment-17364762 ] Timo Walther edited comment on FLINK-22257 at 6/17/21, 7:55 AM: Maybe but not always. We should think about the layers or teams that usually handle different stages of the Flink application lifecycle. Maybe something like CLUSTER, JOB, PIPELINE. But this is just initial idea, it requires deeper investigation. was (Author: twalthr): Maybe but not always. We should think about the layers or teams that usually handle different stages of Flink application lifecycle. Maybe something like CLUSTER, JOB, PIPELINE. But this is just initial idea, it requires deeper investigation. > Clarify Flink ConfigOptions Usage > - > > Key: FLINK-22257 > URL: https://issues.apache.org/jira/browse/FLINK-22257 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Fabian Paul >Priority: Minor > > For users, it is hard to determine which ConfigOptions are relevant for the > different stages of a Flink application. > Beginning from the translation of the user program to the execution on the > cluster. In particular which options can be configured through the different > channels. > * Cluster configuration (i.e. flink-conf.yaml) > * Application configuration, code-based > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22420) UnalignedCheckpointITCase failed
[ https://issues.apache.org/jira/browse/FLINK-22420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364765#comment-17364765 ] Yuan Mei commented on FLINK-22420: -- I did see in FLINK-21996 that a second option/solution is mentioned as well, which can prevent failover due to RPC loss. Given this happens rarely, I'd suggest to just keep it as it as until we have a conclusion on FLINK-21996 ? > UnalignedCheckpointITCase failed > > > Key: FLINK-22420 > URL: https://issues.apache.org/jira/browse/FLINK-22420 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.0 >Reporter: Guowei Ma >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17052&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=9442 > {code:java} > Apr 22 14:28:21 at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > Apr 22 14:28:21 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > Apr 22 14:28:21 at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > Apr 22 14:28:21 at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > Apr 22 14:28:21 at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > Apr 22 14:28:21 at akka.actor.ActorCell.invoke(ActorCell.scala:561) > Apr 22 14:28:21 at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > Apr 22 14:28:21 at akka.dispatch.Mailbox.run(Mailbox.scala:225) > Apr 22 14:28:21 at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > Apr 22 14:28:21 at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Apr 22 14:28:21 Caused by: org.apache.flink.util.FlinkException: An > OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task > failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: > Source: source (1/1) - execution #5 > Apr 22 14:28:21 ... 26 more > Apr 22 14:28:21 > {code} > As described in the comment > https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17326449 > we might need to adjust the tests to allow failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage
[ https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364768#comment-17364768 ] Timo Walther commented on FLINK-22257: -- It might also be useful to look into FLIP-81 and FLIP-73. > Clarify Flink ConfigOptions Usage > - > > Key: FLINK-22257 > URL: https://issues.apache.org/jira/browse/FLINK-22257 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Fabian Paul >Priority: Minor > > For users, it is hard to determine which ConfigOptions are relevant for the > different stages of a Flink application. > Beginning from the translation of the user program to the execution on the > cluster. In particular which options can be configured through the different > channels. > * Cluster configuration (i.e. flink-conf.yaml) > * Application configuration, code-based > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23013) Introduce faker source connector
[ https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364771#comment-17364771 ] Jingsong Lee commented on FLINK-23013: -- Putting it under Flink will allow more people to see and use it. OK, keep it that way until more people need it. > Introduce faker source connector > > > Key: FLINK-23013 > URL: https://issues.apache.org/jira/browse/FLINK-23013 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > > We already have datagen connector. > But sometimes, we need a more real datagen connector which can produce more > natural random records. > We can integrate [https://github.com/DiUS/java-faker] and introduce a > built-in faker connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality
Daniel Lenz created FLINK-23017: --- Summary: HELP in sql-client still shows the removed SOURCE functionality Key: FLINK-23017 URL: https://issues.apache.org/jira/browse/FLINK-23017 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.13.1 Environment: This affects 1.13.0 and 1.13.1 Reporter: Daniel Lenz The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality
[ https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Lenz updated FLINK-23017: Description: The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. ``` {{[ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context}} {{```}} was:The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. > HELP in sql-client still shows the removed SOURCE functionality > --- > > Key: FLINK-23017 > URL: https://issues.apache.org/jira/browse/FLINK-23017 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 > Environment: This affects 1.13.0 and 1.13.1 >Reporter: Daniel Lenz >Priority: Minor > > The sql-client still shows the SOURCE command in HELP, even though the > command itself doesn't exist anymore and using it causes an error. > > ``` > {{[ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.runtime.CalciteException: Non-query expression encountered > in illegal context}} > {{```}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality
[ https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Lenz updated FLINK-23017: Description: The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. {code:java} Flink SQL> source test.sql; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context {code} was: The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. ``` {{[ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context}} {{```}} > HELP in sql-client still shows the removed SOURCE functionality > --- > > Key: FLINK-23017 > URL: https://issues.apache.org/jira/browse/FLINK-23017 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 > Environment: This affects 1.13.0 and 1.13.1 >Reporter: Daniel Lenz >Priority: Minor > > The sql-client still shows the SOURCE command in HELP, even though the > command itself doesn't exist anymore and using it causes an error. > > {code:java} > Flink SQL> source test.sql; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.runtime.CalciteException: Non-query expression > encountered in illegal context > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality
[ https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Lenz updated FLINK-23017: Description: The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. {code:java} /opt/flink# echo "select 'hello';" > test.sql /opt/flink# sql-client.sh Flink SQL> select 'hello'; -- works as intended [INFO] Result retrieval cancelled. Flink SQL> source test.sql; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context {code} was: The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. {code:java} Flink SQL> source test.sql; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context {code} > HELP in sql-client still shows the removed SOURCE functionality > --- > > Key: FLINK-23017 > URL: https://issues.apache.org/jira/browse/FLINK-23017 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 > Environment: This affects 1.13.0 and 1.13.1 >Reporter: Daniel Lenz >Priority: Minor > > The sql-client still shows the SOURCE command in HELP, even though the > command itself doesn't exist anymore and using it causes an error. > > {code:java} > /opt/flink# echo "select 'hello';" > test.sql > /opt/flink# sql-client.sh > Flink SQL> select 'hello'; > -- works as intended > [INFO] Result retrieval cancelled. > Flink SQL> source test.sql; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.runtime.CalciteException: Non-query expression encountered > in illegal context > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality
[ https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Lenz updated FLINK-23017: Description: The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. {code:java} /opt/flink# echo "select 'hello';" > test.sql /opt/flink# sql-client.sh Flink SQL> select 'hello'; -- works as intended [INFO] Result retrieval cancelled. Flink SQL> source test.sql; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context {code} I'd be happy to create a PR and remove the relevant lines from the HELP. was: The sql-client still shows the SOURCE command in HELP, even though the command itself doesn't exist anymore and using it causes an error. {code:java} /opt/flink# echo "select 'hello';" > test.sql /opt/flink# sql-client.sh Flink SQL> select 'hello'; -- works as intended [INFO] Result retrieval cancelled. Flink SQL> source test.sql; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context {code} > HELP in sql-client still shows the removed SOURCE functionality > --- > > Key: FLINK-23017 > URL: https://issues.apache.org/jira/browse/FLINK-23017 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 > Environment: This affects 1.13.0 and 1.13.1 >Reporter: Daniel Lenz >Priority: Minor > > The sql-client still shows the SOURCE command in HELP, even though the > command itself doesn't exist anymore and using it causes an error. > > {code:java} > /opt/flink# echo "select 'hello';" > test.sql > /opt/flink# sql-client.sh > Flink SQL> select 'hello'; > -- works as intended > [INFO] Result retrieval cancelled. > Flink SQL> source test.sql; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.runtime.CalciteException: Non-query expression encountered > in illegal context > {code} > > I'd be happy to create a PR and remove the relevant lines from the HELP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23018) TTL state factory should handle extended state descriptors
Yun Tang created FLINK-23018: Summary: TTL state factory should handle extended state descriptors Key: FLINK-23018 URL: https://issues.apache.org/jira/browse/FLINK-23018 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.14.0 Currently, {{TtlStateFactory}} can only handle fixed type of state descriptors. As {{ValueStateDescriptor}} is not a final class and user could still extend it, however, {{TtlStateFactory}} cannot recognize the extending class. {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kinds of state is. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23018) TTL state factory should handle extended state descriptors
[ https://issues.apache.org/jira/browse/FLINK-23018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-23018: - Description: Currently, {{TtlStateFactory}} can only handle fixed type of state descriptors. As {{ValueStateDescriptor}} is not a final class and user could still extend it, however, {{TtlStateFactory}} cannot recognize the extending class. {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kind of state is. was: Currently, {{TtlStateFactory}} can only handle fixed type of state descriptors. As {{ValueStateDescriptor}} is not a final class and user could still extend it, however, {{TtlStateFactory}} cannot recognize the extending class. {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kinds of state is. > TTL state factory should handle extended state descriptors > -- > > Key: FLINK-23018 > URL: https://issues.apache.org/jira/browse/FLINK-23018 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.14.0 > > > Currently, {{TtlStateFactory}} can only handle fixed type of state > descriptors. As {{ValueStateDescriptor}} is not a final class and user could > still extend it, however, {{TtlStateFactory}} cannot recognize the extending > class. > {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kind > of state is. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-23004) Fix misleading log
[ https://issues.apache.org/jira/browse/FLINK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li resolved FLINK-23004. Fix Version/s: 1.14.0 Assignee: Junfan Zhang Resolution: Fixed Fixed in master: 9785ea4ee896ae6b11303abc6b62078c37e589cd > Fix misleading log > -- > > Key: FLINK-23004 > URL: https://issues.apache.org/jira/browse/FLINK-23004 > Project: Flink > Issue Type: Improvement >Reporter: Junfan Zhang >Assignee: Junfan Zhang >Priority: Minor > Labels: pull-request-available > Fix For: 1.14.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23019) Avoid errors when identifiers use reserved keywords
Timo Walther created FLINK-23019: Summary: Avoid errors when identifiers use reserved keywords Key: FLINK-23019 URL: https://issues.apache.org/jira/browse/FLINK-23019 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Timo Walther We add more and more keywords and built-in functions with special meaning to SQL. However, this could be quite annoying for users that have columns named like a keyword. E.g. {{timestamp}} or {{current_timestamp}}. We should investigate if we can do better and avoid forcing escaping with backticks. IIRC Calcite also offers functionalities for that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364798#comment-17364798 ] Wenhao Ji commented on FLINK-22452: --- Hi, [~mdemierre]. I've just created the FLIP for this. Let us discuss this feature in this thread: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-172-Support-custom-transactional-id-prefix-in-FlinkKafkaProducer-td51355.html] > Support specifying custom transactional.id prefix in FlinkKafkaProducer > --- > > Key: FLINK-22452 > URL: https://issues.apache.org/jira/browse/FLINK-22452 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.12.2 >Reporter: Wenhao Ji >Priority: Minor > Labels: auto-deprioritized-major > > Currently, the "transactional.id"s of the Kafka producers in > FlinkKafkaProducer are generated based on the task name. This mechanism has > some limitations: > * It will exceed Kafka's limitation if the task name is too long. (resolved > in FLINK-17691) > * They will very likely clash each other if the job topologies are similar. > (discussed in FLINK-11654) > * Only certain "transactional.id" may be authorized by [Prefixed > ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] > on the target Kafka cluster. > Besides, the spring community has introduced the > [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)] > method to their Kafka client. > Therefore, I think it will be necessary to have this feature in the Flink > Kafka connector. > > As discussed in FLINK-11654, the possible solution will be, > * either introduce an additional method called > setTransactionalIdPrefix(String) in the FlinkKafkaProducer, > * or use the existing "transactional.id" properties as the prefix. > And the behavior of the "transactional.id" generation will be > * keep the behavior as it was if absent, > * use the one if present as the prefix for the TransactionalIdsGenerator. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality
[ https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Lenz updated FLINK-23017: Environment: (was: This affects 1.13.0 and 1.13.1) > HELP in sql-client still shows the removed SOURCE functionality > --- > > Key: FLINK-23017 > URL: https://issues.apache.org/jira/browse/FLINK-23017 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.1 >Reporter: Daniel Lenz >Priority: Minor > > The sql-client still shows the SOURCE command in HELP, even though the > command itself doesn't exist anymore and using it causes an error. > > {code:java} > /opt/flink# echo "select 'hello';" > test.sql > /opt/flink# sql-client.sh > Flink SQL> select 'hello'; > -- works as intended > [INFO] Result retrieval cancelled. > Flink SQL> source test.sql; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.runtime.CalciteException: Non-query expression encountered > in illegal context > {code} > > I'd be happy to create a PR and remove the relevant lines from the HELP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23013) Introduce faker source connector
[ https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364806#comment-17364806 ] Konstantin Knauf commented on FLINK-23013: -- It would be great if we can find a way for more people to discover connectors, without the need to have it in the Apache Flink project itself. [~AHeise] is looking into ways to do this as far as I know. > Introduce faker source connector > > > Key: FLINK-23013 > URL: https://issues.apache.org/jira/browse/FLINK-23013 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > > We already have datagen connector. > But sometimes, we need a more real datagen connector which can produce more > natural random records. > We can integrate [https://github.com/DiUS/java-faker] and introduce a > built-in faker connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22955) lookup join filter push down result to mismatch function signature
[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364807#comment-17364807 ] JING ZHANG commented on FLINK-22955: Hi, [~gsavl], as we discussed in maillist, the root cause of exception is LookupJoin wants to find a method that matches two lookup keys, 'age, id' after optimizer push down age=10 into LookupJoin. There is a possible improvement to avoid throwing an exception: if not found matched method with two lookup keys, try to match the method with one lookup key. Dimension tables return all records matches 'id=a', then filter output by 'age=10' later Calc. But in this way, we need to know whether the only argument in eval method represents 'age' or 'id'. The hint is missed in current API. As discussed with [~jark] , We think this improvement would involve refactor `LookupTableSource`. We prefer let the user handle this case by adding a eval method that matches lookup keys. for example {code:java} // first solution with one eval method with variable arguments length @SerialVersionUID(1L) class AsyncTableFunction1 extends AsyncTableFunction[RowData] { @varargs def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer*): Unit = { } } // second solution with multiple eval method, each method with fixed arguments length @SerialVersionUID(1L) class AsyncTableFunction1 extends AsyncTableFunction[RowData] { def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = { } def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b: Integer): Unit = { } } {code} What do you think, [~gsavl]. > lookup join filter push down result to mismatch function signature > -- > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached >Reporter: Cooper Luan >Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) >+- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) >+- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) >+- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23020) NullPointerException when running collect from Python API
Maciej Bryński created FLINK-23020: -- Summary: NullPointerException when running collect from Python API Key: FLINK-23020 URL: https://issues.apache.org/jira/browse/FLINK-23020 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.1 Reporter: Maciej Bryński Hi, I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in following scenario. 1. I'm creating datagen table. {code:java} from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.common import Configuration, Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway conf = Configuration() env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.get_config().get_configuration().set_integer("parallelism.default", 1) table_env.execute_sql("DROP TABLE IF EXISTS datagen") table_env.execute_sql(""" CREATE TABLE datagen ( id INT ) WITH ( 'connector' = 'datagen' ) """) {code} 2. Then I'm running collect {code:java} try: result = table_env.sql_query("select * from datagen limit 1").execute() for r in result.collect(): print(r) except KeyboardInterrupt: result.get_job_client().cancel() {code} 3. I'm using "interrupt the kernel" button. This is handled by above try/except and will cancel the query. 4. I'm running collect from point 2 one more time. Result: {code:java} --- Py4JJavaError Traceback (most recent call last) in 1 try: > 2 result = table_env.sql_query("select * from datagen limit 1").execute() 3 for r in result.collect(): 4 print(r) 5 except KeyboardInterrupt: /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) 1070 """ 1071 self._t_env._before_execute() -> 1072 return TableResult(self._j_table.execute()) 1073 1074 def explain(self, *extra_details: ExplainDetail) -> str: /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args) 1283 1284 answer = self.gateway_client.send_command(command) -> 1285 return_value = get_return_value( 1286 answer, self.gateway_client, self.target_id, self.name) 1287 /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, **kw) 144 def deco(*a, **kw): 145 try: --> 146 return f(*a, **kw) 147 except Py4JJavaError as e: 148 from pyflink.java_gateway import get_gateway /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o69.execute. : java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) at org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) at org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) at org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858) at org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API
[ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maciej Bryński updated FLINK-23020: --- Summary: NullPointerException when running collect twice from Python API (was: NullPointerException when running collect from Python API) > NullPointerException when running collect twice from Python API > --- > > Key: FLINK-23020 > URL: https://issues.apache.org/jira/browse/FLINK-23020 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.13.1 >Reporter: Maciej Bryński >Priority: Major > > Hi, > I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in > following scenario. > 1. I'm creating datagen table. > {code:java} > from pyflink.table import EnvironmentSettings, TableEnvironment, > StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.common import Configuration, Row > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.java_gateway import get_gateway > conf = Configuration() > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > table_env.get_config().get_configuration().set_integer("parallelism.default", > 1) > table_env.execute_sql("DROP TABLE IF EXISTS datagen") > table_env.execute_sql(""" > CREATE TABLE datagen ( > id INT > ) WITH ( > 'connector' = 'datagen' > ) > """) > {code} > 2. Then I'm running collect > {code:java} > try: > result = table_env.sql_query("select * from datagen limit 1").execute() > for r in result.collect(): > print(r) > except KeyboardInterrupt: > result.get_job_client().cancel() > {code} > 3. I'm using "interrupt the kernel" button. This is handled by above > try/except and will cancel the query. > 4. I'm running collect from point 2 one more time. Result: > {code:java} > --- > Py4JJavaError Traceback (most recent call last) > in > 1 try: > > 2 result = table_env.sql_query("select * from datagen limit > 1").execute() > 3 for r in result.collect(): > 4 print(r) > 5 except KeyboardInterrupt: > /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) >1070 """ >1071 self._t_env._before_execute() > -> 1072 return TableResult(self._j_table.execute()) >1073 >1074 def explain(self, *extra_details: ExplainDetail) -> str: > /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, > *args) >1283 >1284 answer = self.gateway_client.send_command(command) > -> 1285 return_value = get_return_value( >1286 answer, self.gateway_client, self.target_id, self.name) >1287 > /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, > **kw) > 144 def deco(*a, **kw): > 145 try: > --> 146 return f(*a, **kw) > 147 except Py4JJavaError as e: > 148 from pyflink.java_gateway import get_gateway > /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) > 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 325 if answer[1] == REFERENCE_TYPE: > --> 326 raise Py4JJavaError( > 327 "An error occurred while calling {0}{1}{2}.\n". > 328 format(target_id, ".", name), value) > Py4JJavaError: An error occurred while calling o69.execute. > : java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) > at > org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) > at > org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) > at > org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) > at > org.apache.c
[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API
[ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maciej Bryński updated FLINK-23020: --- Description: Hi, I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in following scenario. 1. I'm creating datagen table. {code:java} from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.common import Configuration, Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway conf = Configuration() env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.get_config().get_configuration().set_integer("parallelism.default", 1) table_env.execute_sql("DROP TABLE IF EXISTS datagen") table_env.execute_sql(""" CREATE TABLE datagen ( id INT ) WITH ( 'connector' = 'datagen' ) """) {code} 2. Then I'm running collect {code:java} try: result = table_env.sql_query("select * from datagen limit 1").execute() for r in result.collect(): print(r) except KeyboardInterrupt: result.get_job_client().cancel() {code} 3. I'm using "interrupt the kernel" button. This is handled by above try/except and will cancel the query. 4. I'm running collect from point 2 one more time. Result: {code:java} --- Py4JJavaError Traceback (most recent call last) in 1 try: > 2 result = table_env.sql_query("select * from datagen limit 1").execute() 3 for r in result.collect(): 4 print(r) 5 except KeyboardInterrupt: /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) 1070 """ 1071 self._t_env._before_execute() -> 1072 return TableResult(self._j_table.execute()) 1073 1074 def explain(self, *extra_details: ExplainDetail) -> str: /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args) 1283 1284 answer = self.gateway_client.send_command(command) -> 1285 return_value = get_return_value( 1286 answer, self.gateway_client, self.target_id, self.name) 1287 /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, **kw) 144 def deco(*a, **kw): 145 try: --> 146 return f(*a, **kw) 147 except Py4JJavaError as e: 148 from pyflink.java_gateway import get_gateway /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o69.execute. : java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) at org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) at org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) at org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858) at org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.tabl
[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API
[ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maciej Bryński updated FLINK-23020: --- Description: Hi, I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in following scenario. 1. I'm creating datagen table. {code:java} from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.common import Configuration, Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway conf = Configuration() env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.get_config().get_configuration().set_integer("parallelism.default", 1) table_env.execute_sql("DROP TABLE IF EXISTS datagen") table_env.execute_sql(""" CREATE TABLE datagen ( id INT ) WITH ( 'connector' = 'datagen' ) """) {code} 2. Then I'm running collect {code:java} try: result = table_env.sql_query("select * from datagen limit 1").execute() for r in result.collect(): print(r) except KeyboardInterrupt: result.get_job_client().cancel() {code} 3. I'm using "interrupt the kernel" button. This is handled by above try/except and will cancel the query. 4. I'm running collect from point 2 one more time. Result: {code:java} --- Py4JJavaError Traceback (most recent call last) in 1 try: > 2 result = table_env.sql_query("select * from datagen limit 1").execute() 3 for r in result.collect(): 4 print(r) 5 except KeyboardInterrupt: /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) 1070 """ 1071 self._t_env._before_execute() -> 1072 return TableResult(self._j_table.execute()) 1073 1074 def explain(self, *extra_details: ExplainDetail) -> str: /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args) 1283 1284 answer = self.gateway_client.send_command(command) -> 1285 return_value = get_return_value( 1286 answer, self.gateway_client, self.target_id, self.name) 1287 /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, **kw) 144 def deco(*a, **kw): 145 try: --> 146 return f(*a, **kw) 147 except Py4JJavaError as e: 148 from pyflink.java_gateway import get_gateway /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o69.execute. : java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) at org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) at org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) at org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858) at org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.tabl
[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API
[ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maciej Bryński updated FLINK-23020: --- Description: Hi, I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in following scenario. 1. I'm creating datagen table. {code:java} from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.common import Configuration, Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway conf = Configuration() env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.get_config().get_configuration().set_integer("parallelism.default", 1) table_env.execute_sql("DROP TABLE IF EXISTS datagen") table_env.execute_sql(""" CREATE TABLE datagen ( id INT ) WITH ( 'connector' = 'datagen' ) """) {code} 2. Then I'm running collect {code:java} try: result = table_env.sql_query("select * from datagen limit 1").execute() for r in result.collect(): print(r) except KeyboardInterrupt: result.get_job_client().cancel() {code} 3. I'm using "interrupt the kernel" button. This is handled by above try/except and will cancel the query. 4. I'm running collect from point 2 one more time. Result: {code:java} --- Py4JJavaError Traceback (most recent call last) in 1 try: > 2 result = table_env.sql_query("select * from datagen limit 1").execute() 3 for r in result.collect(): 4 print(r) 5 except KeyboardInterrupt: /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) 1070 """ 1071 self._t_env._before_execute() -> 1072 return TableResult(self._j_table.execute()) 1073 1074 def explain(self, *extra_details: ExplainDetail) -> str: /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args) 1283 1284 answer = self.gateway_client.send_command(command) -> 1285 return_value = get_return_value( 1286 answer, self.gateway_client, self.target_id, self.name) 1287 /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, **kw) 144 def deco(*a, **kw): 145 try: --> 146 return f(*a, **kw) 147 except Py4JJavaError as e: 148 from pyflink.java_gateway import get_gateway /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o69.execute. : java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) at org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) at org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) at org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858) at org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.tabl
[jira] [Commented] (FLINK-23013) Introduce faker source connector
[ https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364811#comment-17364811 ] Jingsong Lee commented on FLINK-23013: -- (y) Yes, we need a website or something else. > Introduce faker source connector > > > Key: FLINK-23013 > URL: https://issues.apache.org/jira/browse/FLINK-23013 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > > We already have datagen connector. > But sometimes, we need a more real datagen connector which can produce more > natural random records. > We can integrate [https://github.com/DiUS/java-faker] and introduce a > built-in faker connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22974) No execution checkpointing config desc in flink-conf.yaml
[ https://issues.apache.org/jira/browse/FLINK-22974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364156#comment-17364156 ] tinawenqiao edited comment on FLINK-22974 at 6/17/21, 9:51 AM: --- [~jark],PR has submitted here [https://github.com/apache/flink/pull/16183|http://example.com] was (Author: wenqiao): [~jark],PR has submitted here [https://github.com/apache/flink/pull/16147|http://example.com] > No execution checkpointing config desc in flink-conf.yaml > -- > > Key: FLINK-22974 > URL: https://issues.apache.org/jira/browse/FLINK-22974 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.13.1 >Reporter: tinawenqiao >Assignee: tinawenqiao >Priority: Minor > > We found that there is no parameter description for execution checkpointing > in flink-conf.yaml. It may cause a misunderstanding. I think we can add some > important parameters, such as > execution.checkpointing.interval,execution.checkpointing.externalized-checkpoint-retention,execution.checkpointing.tolerable-failed-checkpoints > etc. > > #== > # Fault tolerance and checkpointing > > #== > # The backend that will be used to store operator state checkpoints if > # checkpointing is enabled. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23021) Check if the restarted job topology is consistent on restart
Yun Gao created FLINK-23021: --- Summary: Check if the restarted job topology is consistent on restart Key: FLINK-23021 URL: https://issues.apache.org/jira/browse/FLINK-23021 Project: Flink Issue Type: Sub-task Reporter: Yun Gao Users might modify the job topology before restart for external checkpoint and savepoint. To overcome this issue, we would need to check if a fully finished operator have been added after a non-fully-finished operator. If so, we would throw exception to disallow this situation or re-mark the fully finished operator as alive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22998) Flink SQL does not support block comment before SET command
[ https://issues.apache.org/jira/browse/FLINK-22998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-22998: Component/s: (was: Table SQL / Planner) Table SQL / API > Flink SQL does not support block comment before SET command > --- > > Key: FLINK-22998 > URL: https://issues.apache.org/jira/browse/FLINK-22998 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.0 >Reporter: Zhiwen Sun >Priority: Major > > Flink SQL does not support block comment before SET command. > A tiny SQL that produces the bug: > > {code:java} > /** > comment > **/ > SET sql-client.execution.result-mode=TABLEAU; > SELECT 'hello';{code} > > while following SQL works fine: > > {code:java} > SET sql-client.execution.result-mode=TABLEAU; > /** > comment > **/ > SELECT 'hello';{code} > > > After I debug Flink source code, I found that EXTENDED_PARSER does not > support block comment. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23013) Introduce faker source connector
[ https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364815#comment-17364815 ] Jark Wu commented on FLINK-23013: - Isn't it https://flink-packages.org/ ? > Introduce faker source connector > > > Key: FLINK-23013 > URL: https://issues.apache.org/jira/browse/FLINK-23013 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > > We already have datagen connector. > But sometimes, we need a more real datagen connector which can produce more > natural random records. > We can integrate [https://github.com/DiUS/java-faker] and introduce a > built-in faker connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22703) SavepointITCase.testTriggerSavepointAndResumeWithFileBasedCheckpoints failed
[ https://issues.apache.org/jira/browse/FLINK-22703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Kalashnikov closed FLINK-22703. - Resolution: Duplicate This test use the same codebase that will be fixed in https://issues.apache.org/jira/browse/FLINK-22593 > SavepointITCase.testTriggerSavepointAndResumeWithFileBasedCheckpoints failed > > > Key: FLINK-22703 > URL: https://issues.apache.org/jira/browse/FLINK-22703 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.14.0 >Reporter: Guowei Ma >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18106&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=c2734c79-73b6-521c-e85a-67c7ecae9107&l=9765 > {code:java} > May 19 00:51:42 Caused by: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > triggering task Map (3/4) of job 1785ead58e598bc669c92d5a2cd14618 has not > being executed at the moment. Aborting checkpoint. Failure reason: Not all > required tasks are currently running. > May 19 00:51:42 at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152) > May 19 00:51:42 at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114) > May 19 00:51:42 at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > May 19 00:51:42 at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > May 19 00:51:42 at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > May 19 00:51:42 at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > May 19 00:51:42 at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > May 19 00:51:42 at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > May 19 00:51:42 at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > May 19 00:51:42 at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > May 19 00:51:42 at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > May 19 00:51:42 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > May 19 00:51:42 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > May 19 00:51:42 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > May 19 00:51:42 at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > May 19 00:51:42 at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > May 19 00:51:42 at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > May 19 00:51:42 at akka.actor.ActorCell.invoke(ActorCell.scala:561) > May 19 00:51:42 at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > May 19 00:51:42 at akka.dispatch.Mailbox.run(Mailbox.scala:225) > May 19 00:51:42 at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > May 19 00:51:42 at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > May 19 00:51:42 at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > May 19 00:51:42 at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > May 19 00:51:42 at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22625) FileSinkMigrationITCase unstable
[ https://issues.apache.org/jira/browse/FLINK-22625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364826#comment-17364826 ] Anton Kalashnikov commented on FLINK-22625: --- [~gaoyunhaii], I think I found the problem and I will fix along with the same problem in another test. Please, check this PR [https://github.com/apache/flink/pull/16151/files#diff-5df9eea57ea9542415f7e58a7c7051a1c6156f42423ba69f4323e697bb2c9cb4,] and if you agree that it will resolve the problem can you close this ticket as duplicate of https://issues.apache.org/jira/browse/FLINK-22593 > FileSinkMigrationITCase unstable > > > Key: FLINK-22625 > URL: https://issues.apache.org/jira/browse/FLINK-22625 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Yun Gao >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17817&view=logs&j=5cae8624-c7eb-5c51-92d3-4d2dacedd221&t=420bd9ec-164e-562e-8947-0dacde3cec91&l=22179 > {code} > May 11 00:43:40 Caused by: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > triggering task Sink: Unnamed (1/3) of job 733a4777cca170f86724832642e2a8b1 > has not being executed at the moment. Aborting checkpoint. Failure reason: > Not all required tasks are currently running. > May 11 00:43:40 at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152) > May 11 00:43:40 at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114) > May 11 00:43:40 at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > May 11 00:43:40 at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > May 11 00:43:40 at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > May 11 00:43:40 at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > May 11 00:43:40 at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > May 11 00:43:40 at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > May 11 00:43:40 at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > May 11 00:43:40 at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > May 11 00:43:40 at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > May 11 00:43:40 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > May 11 00:43:40 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > May 11 00:43:40 at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > May 11 00:43:40 at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > May 11 00:43:40 at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > May 11 00:43:40 at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > May 11 00:43:40 at akka.actor.ActorCell.invoke(ActorCell.scala:561) > May 11 00:43:40 at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > May 11 00:43:40 at akka.dispatch.Mailbox.run(Mailbox.scala:225) > May 11 00:43:40 at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > May 11 00:43:40 at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > May 11 00:43:40 at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > May 11 00:43:40 at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > May 11 00:43:40 at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11982) BatchTableSourceFactory support Json Format File
[ https://issues.apache.org/jira/browse/FLINK-11982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11982: --- Labels: auto-deprioritized-major auto-unassigned pull-request-available (was: auto-unassigned pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > BatchTableSourceFactory support Json Format File > > > Key: FLINK-11982 > URL: https://issues.apache.org/jira/browse/FLINK-11982 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: pingle wang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > java code : > {code:java} > val connector = FileSystem().path("data/in/test.json") > val desc = tEnv.connect(connector) > .withFormat( > new Json().schema( > Types.ROW(Array[String]("id", "name", "age"), > Array[TypeInformation[_]](Types.STRING, > Types.STRING, Types.INT)) > ) > .failOnMissingField(true) > ) > .registerTableSource("persion") > val sql = "select * from person" > val result = tEnv.sqlQuery(sql) > {code} > Exception info : > {code:java} > Exception in thread "main" > org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a > suitable table factory for > 'org.apache.flink.table.factories.BatchTableSourceFactory' in > the classpath. > Reason: No context matches. > The following properties are requested: > connector.path=file:///Users/batch/test.json > connector.property-version=1 > connector.type=filesystem > format.derive-schema=true > format.fail-on-missing-field=true > format.property-version=1 > format.type=json > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > org.apache.flink.formats.avro.AvroRowFormatFactory > org.apache.flink.formats.json.JsonRowFormatFactory > org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory > org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory > at > org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) > at > org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) > at > org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81) > at > org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:44) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46) > at com.meitu.mlink.sql.batch.JsonExample.main(JsonExample.java:36){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11968) Fix runtime SingleElementIterator.iterator and remove table.SingleElementIterator
[ https://issues.apache.org/jira/browse/FLINK-11968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11968: --- Labels: auto-deprioritized-major auto-unassigned pull-request-available (was: auto-unassigned pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Fix runtime SingleElementIterator.iterator and remove > table.SingleElementIterator > - > > Key: FLINK-11968 > URL: https://issues.apache.org/jira/browse/FLINK-11968 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Reporter: Jingsong Lee >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > {code:java} > @Override > public Iterator iterator() { >return this; > } > {code} > In iterator we need set available to true otherwise we can only iterator once. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-12003) Revert the config option about mapreduce.output.basename in HadoopOutputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-12003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-12003: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Revert the config option about mapreduce.output.basename in > HadoopOutputFormatBase > -- > > Key: FLINK-12003 > URL: https://issues.apache.org/jira/browse/FLINK-12003 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hadoop Compatibility >Reporter: vinoyang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > In {{HadoopOutputFormatBase}} open method, the config option > {{mapreduce.output.basename}} was changed to "tmp" and there is not any > documentation state this change. > By default, HDFS will use this format "part-x-y" to name its file, the x > and y means : > * {{x}} is either 'm' or 'r', depending on whether the job was a map only > job, or reduce > * {{y}} is the mapper or reducer task number (zero based) > > The keyword "part" has used in many place in user's business logic to match > the hdfs's file name. So I suggest to revert this config option or document > it. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
[ https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11848: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION > > > Key: FLINK-11848 > URL: https://issues.apache.org/jira/browse/FLINK-11848 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.6.4 >Reporter: Shengnan YU >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > Recently we are doing some streaming jobs with apache flink. There are > multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex > pattern to let a consumer to consume those topics. However, if we delete some > older topics, it seems that the metadata in consumer does not update properly > so It still remember those outdated topic in its topic list, which leads to > *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It > seems to occur in producer as well. Any idea to solve this problem? Thank you > very much! > > Example to reproduce problem: > There are multiple kafka topics which are > "test20190310","test20190311","test20190312" for instance. I run the job and > everything is ok. Then if I delete topic "test20190310", the consumer does > not perceive the topic is deleted, it will still go fetching metadata of that > topic. In taskmanager's log, unknown errors display. > {code:java} > public static void main(String []args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.rest", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, >"120"); > Pattern topics = Pattern.compile("^test.*$"); > FlinkKafkaConsumer011 consumer = new > FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props); > DataStream stream = env.addSource(consumer); > stream.writeToSocket("localhost", 4, new SimpleStringSchema()); > env.execute("test"); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11970) TableEnvironment#registerFunction should overwrite if the same name function existed.
[ https://issues.apache.org/jira/browse/FLINK-11970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11970: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > TableEnvironment#registerFunction should overwrite if the same name function > existed. > -- > > Key: FLINK-11970 > URL: https://issues.apache.org/jira/browse/FLINK-11970 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Jeff Zhang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > Currently, it would throw exception if I try to register user function > multiple times. Registering udf multiple times is very common usage in > notebook scenario. And I don't think it would cause issues for users. > e.g. If user happened to register the same udf multiple times (he intend to > register 2 different udf actually), then he would get exception at runtime > where he use the udf that is missing registration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11934) Remove the deprecate methods in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-11934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11934: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Remove the deprecate methods in TableEnvironment > > > Key: FLINK-11934 > URL: https://issues.apache.org/jira/browse/FLINK-11934 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Hequn Cheng >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > There are several {{getTableEnvironment()}} methods which are deprecated > during 1.8 in {{TableEnvironment}}. As the release-1.8 has been cut off. We > can remove these deprecate methods now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions
[ https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11909: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Provide default failure/timeout/backoff handling strategy for AsyncIO > functions > --- > > Key: FLINK-11909 > URL: https://issues.apache.org/jira/browse/FLINK-11909 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Rong Rong >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > Currently Flink AsyncIO by default fails the entire job when async function > invoke fails [1]. It would be nice to have some default Async IO > failure/timeout handling strategy, or opens up some APIs for AsyncFunction > timeout method to interact with the AsyncWaitOperator. For example (quote > [~suez1224] in [2]): > * FAIL_OPERATOR (default & current behavior) > * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times) > * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) > Discussion also extended to introduce configuration such as: > * MAX_RETRY_COUNT > * RETRY_FAILURE_POLICY > REF: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11936) Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue.
[ https://issues.apache.org/jira/browse/FLINK-11936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11936: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter > issue. > - > > Key: FLINK-11936 > URL: https://issues.apache.org/jira/browse/FLINK-11936 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Rong Rong >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > AuxiliaryConverter was pulled in FLINK-6409. Since CALCITE-1761 has been > fixed, we should sync back with the calcite version. > After a quick glance, I think it is not so simple to just delete the class so > I opened a follow up Jira on this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11893) Update ecosystem page and add it to the navigation bar
[ https://issues.apache.org/jira/browse/FLINK-11893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11893: --- Labels: auto-deprioritized-major auto-unassigned pull-request-available (was: auto-unassigned pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Update ecosystem page and add it to the navigation bar > -- > > Key: FLINK-11893 > URL: https://issues.apache.org/jira/browse/FLINK-11893 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Jiangjie Qin >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Based on the discussion in the mailing list, we would like to encourage the > connector authors to share their connectors with the community, even the > connectors are not in the Flink repo. > This ticket updates the ecosystem page on the website to reflect the change. > It also adds ecosystem page back to the navigation bar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11747) Elasticsearch 6 connector - Expose RestHighLevelClient to allow for custom sniffing
[ https://issues.apache.org/jira/browse/FLINK-11747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11747: --- Labels: auto-deprioritized-major auto-unassigned pull-request-available (was: auto-unassigned pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Elasticsearch 6 connector - Expose RestHighLevelClient to allow for custom > sniffing > --- > > Key: FLINK-11747 > URL: https://issues.apache.org/jira/browse/FLINK-11747 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: Samir Desai >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > In the Elasticsearch6 connector, the > [RestClientFactory|https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java#L31] > allows users to customize the {{RestClientBuilder}}. However, certain > customizations like adding > [Sniffing|https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/_usage.html] > > require access to the low-level rest client, which can be obtained through > the high level rest client. The {{RestHighLevelClient}} is > [instantiated|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java#L75] > in the api call bridge, and is never exposed to the user. > To my knowledge, the current Elasticsearch6 connector does not utilize > sniffing or provide a way to add it in. The Elasticsearch6 connector should > expose some type of access to the RestHighLevelClient to allow for custom > sniffing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11737) Support org.apache.hadoop.mapreduce.lib.output.MultipleOutputs output
[ https://issues.apache.org/jira/browse/FLINK-11737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11737: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Support org.apache.hadoop.mapreduce.lib.output.MultipleOutputs output > - > > Key: FLINK-11737 > URL: https://issues.apache.org/jira/browse/FLINK-11737 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hadoop Compatibility >Reporter: vinoyang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > This issue is to improve Flink's compatibility with Hadoop. Currently, for > the old version of the Hadoop API, there is > {{org.apache.hadoop.mapred.lib.MultipleOutputFormat}}, which can be used > directly. However, for the new version of the Hadoop API > {{org.apache.hadoop.mapreduce.lib.output.MultipleOutputs}}, the current Flink > cannot be supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11818) Provide pipe transformation function for DataSet API
[ https://issues.apache.org/jira/browse/FLINK-11818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11818: --- Labels: auto-deprioritized-major auto-unassigned pull-request-available (was: auto-unassigned pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Provide pipe transformation function for DataSet API > > > Key: FLINK-11818 > URL: https://issues.apache.org/jira/browse/FLINK-11818 > Project: Flink > Issue Type: Improvement > Components: API / DataSet >Reporter: vinoyang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We have some business requirements that require the data handled by Flink to > interact with some external programs (such as Python/Perl/shell scripts). > There is no such function in the existing DataSet API, although it can be > implemented by the map function, but it is not concise. It would be helpful > if we could provide a pipe[1] function like Spark. > [1]: > https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-4004) Do not pass custom flink kafka connector properties to Kafka to avoid warnings
[ https://issues.apache.org/jira/browse/FLINK-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-4004: -- Labels: auto-unassigned stale-major (was: auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Do not pass custom flink kafka connector properties to Kafka to avoid warnings > -- > > Key: FLINK-4004 > URL: https://issues.apache.org/jira/browse/FLINK-4004 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Robert Metzger >Priority: Major > Labels: auto-unassigned, stale-major > > The FlinkKafkaConsumer has some custom properties, which we pass to the > KafkaConsumer as well (such as {{flink.poll-timeout}}). This causes Kafka to > log warnings about unused properties. > We should not pass Flink-internal properties to Kafka, to avoid those > warnings. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-4088) Add interface to save and load TableSources
[ https://issues.apache.org/jira/browse/FLINK-4088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-4088: -- Labels: auto-unassigned stale-major (was: auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add interface to save and load TableSources > --- > > Key: FLINK-4088 > URL: https://issues.apache.org/jira/browse/FLINK-4088 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Priority: Major > Labels: auto-unassigned, stale-major > > Add an interface to save and load table sources similar to Java's > {{Serializable}} interface. > TableSources should implement the interface to become saveable and loadable. > This could be used as follows: > {code} > val cts = new CsvTableSource( > "/path/to/csvfile", > Array("name", "age", "address"), > Array(BasicTypeInfo.STRING_TYPEINFO, ...), > ... > ) > cts.saveToFile("/path/to/tablesource/file") > // - > val tEnv: TableEnvironment = ??? > tEnv.loadTableSource("persons", "/path/to/tablesource/file") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-4043) Generalize RabbitMQ connector into AMQP connector
[ https://issues.apache.org/jira/browse/FLINK-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-4043: -- Labels: stale-major (was: ) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Generalize RabbitMQ connector into AMQP connector > - > > Key: FLINK-4043 > URL: https://issues.apache.org/jira/browse/FLINK-4043 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Robert Metzger >Priority: Major > Labels: stale-major > > Our current RabbitMQ connector is actually using a AMQP client implemented by > RabbitMQ. > AMQP is a protocol for message queues, implemented by different clients and > brokers. > I'm suggesting to rename the connector so that its more obvious to users of > other brokers that they can use the connector as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3133: -- Labels: auto-unassigned stale-major (was: auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0, 0.10.1, 1.0.0 >Reporter: Maximilian Michels >Priority: Major > Labels: auto-unassigned, stale-major > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-2186) Rework CSV import to support very wide files
[ https://issues.apache.org/jira/browse/FLINK-2186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-2186: -- Labels: auto-unassigned pull-request-available stale-major (was: auto-unassigned pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Rework CSV import to support very wide files > > > Key: FLINK-2186 > URL: https://issues.apache.org/jira/browse/FLINK-2186 > Project: Flink > Issue Type: Improvement > Components: API / Scala, Library / Machine Learning >Reporter: Theodore Vasiloudis >Priority: Major > Labels: auto-unassigned, pull-request-available, stale-major > Time Spent: 10m > Remaining Estimate: 0h > > In the current readVcsFile implementation, importing CSV files with many > columns can become from cumbersome to impossible. > For example to import an 11 column file we need to write: > {code} > val cancer = env.readCsvFile[(String, String, String, String, String, String, > String, String, String, String, > String)]("/path/to/breast-cancer-wisconsin.data") > {code} > For many use cases in Machine Learning we might have CSV files with thousands > or millions of columns that we want to import as vectors. > In that case using the current readCsvFile method becomes impossible. > We therefore need to rework the current function, or create a new one that > will allow us to import CSV files with an arbitrary number of columns. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)
[ https://issues.apache.org/jira/browse/FLINK-3394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3394: -- Labels: auto-deprioritized-critical stale-major (was: auto-deprioritized-critical) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Clear up the contract of MutableObjectIterator.next(reuse) > -- > > Key: FLINK-3394 > URL: https://issues.apache.org/jira/browse/FLINK-3394 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.0.0 >Reporter: Gábor Gévay >Priority: Major > Labels: auto-deprioritized-critical, stale-major > > {{MutableObjectIterator.next(reuse)}} has the following contract (according > to [~StephanEwen]'s comment \[1\]): > 1. The caller may not hold onto {{reuse}} any more > 2. The iterator implementor may not hold onto the returned object any more. > This should be documented in its javadoc (with "WARNING" so that people don't > overlook it). > Additionally, since this was a "secret contract" up to now, all the 270 > usages of {{MutableObjectIterator.next(reuse)}} should be checked for > violations. A few that are suspicious at first glance, are in > {{CrossDriver}}, {{UnionWithTempOperator}}, > {{MutableHashTable.ProbeIterator.next}}, > {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. (The violating calls > in the reduce drivers are being fixed by > https://github.com/apache/flink/pull/1626 ) > \[1\] > https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API
[ https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3850: -- Labels: auto-unassigned pull-request-available stale-major (was: auto-unassigned pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add forward field annotations to DataSet operators generated by the Table API > - > > Key: FLINK-3850 > URL: https://issues.apache.org/jira/browse/FLINK-3850 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Legacy Planner >Reporter: Fabian Hueske >Priority: Major > Labels: auto-unassigned, pull-request-available, stale-major > Time Spent: 10m > Remaining Estimate: 0h > > The DataSet API features semantic annotations [1] to hint the optimizer which > input fields an operator copies. This information is valuable for the > optimizer because it can infer that certain physical properties such as > partitioning or sorting are not destroyed by user functions and thus generate > more efficient execution plans. > The Table API is built on top of the DataSet API and generates DataSet > programs and code for user-defined functions. Hence, it knows exactly which > fields are modified and which not. We should use this information to > automatically generate forward field annotations and attach them to the > operators. This can help to significantly improve the performance of certain > jobs. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3997) PRNG Skip-ahead
[ https://issues.apache.org/jira/browse/FLINK-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3997: -- Labels: auto-unassigned stale-major (was: auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > PRNG Skip-ahead > --- > > Key: FLINK-3997 > URL: https://issues.apache.org/jira/browse/FLINK-3997 > Project: Flink > Issue Type: Improvement > Components: Library / Graph Processing (Gelly) >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Priority: Major > Labels: auto-unassigned, stale-major > > The current sources of randomness for Gelly Graph Generators use fixed-size > blocks of work which include an initial seed. There are two issues with this > approach. First, the size of the collection of blocks can exceed the Akka > limit and cause the job to silently fail. Second, as the block seeds are > randomly chosen, the likelihood of blocks overlapping and producing the same > sequence increases with the size of the graph. > The random generators will be reimplemented using {{SplittableIterator}} and > PRNGs supporting skip-ahead. > This ticket will implement skip-ahead with LCGs [0]. Future work may add > support for xorshift generators ([1], section 5 "Jumping Ahead"). > [0] > https://mit-crpg.github.io/openmc/methods/random_numbers.html#skip-ahead-capability > [1] https://arxiv.org/pdf/1404.0390.pdf -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink
[ https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3473: -- Labels: auto-unassigned stale-major (was: auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add partial aggregate support in Flink > -- > > Key: FLINK-3473 > URL: https://issues.apache.org/jira/browse/FLINK-3473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Legacy Planner >Reporter: Chengxiang Li >Priority: Major > Labels: auto-unassigned, stale-major > Attachments: PartialAggregateinFlink_v1.pdf, > PartialAggregateinFlink_v2.pdf > > > For decomposable aggregate function, partial aggregate is more efficient as > it significantly reduce the network traffic during shuffle and parallelize > part of the aggregate calculation. This is an umbrella task for partial > aggregate. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3857) Add reconnect attempt to Elasticsearch host
[ https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3857: -- Labels: auto-unassigned pull-request-available stale-major (was: auto-unassigned pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add reconnect attempt to Elasticsearch host > --- > > Key: FLINK-3857 > URL: https://issues.apache.org/jira/browse/FLINK-3857 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.0.2, 1.1.0 >Reporter: Fabian Hueske >Priority: Major > Labels: auto-unassigned, pull-request-available, stale-major > Time Spent: 10m > Remaining Estimate: 0h > > Currently, the connection to the Elasticsearch host is opened in > {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a > changed DNS entry), the sink fails. > I propose to catch the Exception for lost connections in the {{invoke()}} > method and try to re-open the connection for a configurable number of times > with a certain delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3957) Breaking changes for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3957: -- Labels: stale-major (was: ) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Breaking changes for Flink 2.0 > -- > > Key: FLINK-3957 > URL: https://issues.apache.org/jira/browse/FLINK-3957 > Project: Flink > Issue Type: New Feature > Components: API / DataSet, API / DataStream, Build System >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Priority: Major > Labels: stale-major > Fix For: 2.0.0 > > > From time to time, we find APIs in Flink (1.x.y) marked as stable, even > though we would like to change them at some point. > This JIRA is to track all planned breaking API changes. > I would suggest to add subtasks to this one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3462) Add position and magic number checks to record (de)serializers
[ https://issues.apache.org/jira/browse/FLINK-3462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3462: -- Labels: stale-major (was: ) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add position and magic number checks to record (de)serializers > -- > > Key: FLINK-3462 > URL: https://issues.apache.org/jira/browse/FLINK-3462 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Ufuk Celebi >Priority: Major > Labels: stale-major > > In order to improve the debugging experience in case of serialization errors, > we can add further sanity checks to the deserializers: > - Check expected position in buffer in order to ensure that record boundaries > are not crossed within a buffer or bytes left unconsumed. > - Add magic number to record serializer and deserializer in order to check > byte corruption. We currently only have this on a per buffer level. > We will be able to use these checks to improve the error messages. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-2396) Review the datasets of dynamic path and static path in iteration.
[ https://issues.apache.org/jira/browse/FLINK-2396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-2396: -- Labels: auto-unassigned stale-major (was: auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Review the datasets of dynamic path and static path in iteration. > - > > Key: FLINK-2396 > URL: https://issues.apache.org/jira/browse/FLINK-2396 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Chengxiang Li >Priority: Major > Labels: auto-unassigned, stale-major > > Currently Flink would cached dataset in static path as it assumes that > dataset stay the same during the iteration, but this assumption does not > always be true. Take sampling for example, the iteration data set is > something like the weight vector of model and there is another training > dataset from which to take a small sample to update the weight vector in each > iteration (e.g. Stochastic Gradient Descent), we expect sampled dataset is > different in each iteration, but Flink would cache the sampled dataset as it > in static path. > We should review how Flink identify dynamic path and static path, and support > add sampled dataset in above example to dynamic path. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-2055: -- Labels: auto-unassigned pull-request-available stale-major (was: auto-unassigned pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Affects Versions: 0.9 >Reporter: Robert Metzger >Priority: Major > Labels: auto-unassigned, pull-request-available, stale-major > Time Spent: 10m > Remaining Estimate: 0h > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-951) Reworking of Iteration Synchronization, Accumulators and Aggregators
[ https://issues.apache.org/jira/browse/FLINK-951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-951: - Labels: auto-unassigned refactoring stale-major (was: auto-unassigned refactoring) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Reworking of Iteration Synchronization, Accumulators and Aggregators > > > Key: FLINK-951 > URL: https://issues.apache.org/jira/browse/FLINK-951 > Project: Flink > Issue Type: Improvement > Components: API / DataSet, Runtime / Task >Affects Versions: 0.9 >Reporter: Markus Holzemer >Priority: Major > Labels: auto-unassigned, refactoring, stale-major > Original Estimate: 168h > Remaining Estimate: 168h > > I just realized that there is no real Jira issue for the task I am currently > working on. > I am currently reworking a few things regarding Iteration Synchronization, > Accumulators and Aggregators. Currently the synchronization at the end of one > superstep is done through channel events. That makes it hard to track the > current status of iterations. That is why I am changing this synchronization > to use RPC calls with the JobManager, so that the JobManager manages the > current status of all iterations. > Currently we use Accumulators outside of iterations and Aggregators inside of > iterations. Both have a similiar function, but a bit different interfaces and > handling. I want to unify these two concepts. I propose that we stick in the > future to Accumulators only. Aggregators therefore are removed and > Accumulators are extended to cover the usecases Aggregators were used fore > before. The switch to RPC for iterations makes it possible to also send the > current Accumulator values at the end of each superstep, so that the > JobManager (and thereby the webinterface) will be able to print intermediate > accumulation results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11912: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11799: --- Labels: auto-deprioritized-major auto-unassigned pull-request-available (was: auto-unassigned pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > KryoSerializer/OperatorChain ignores copy failure resulting in > NullPointerException > --- > > Key: FLINK-11799 > URL: https://issues.apache.org/jira/browse/FLINK-11799 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Jason Kania >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > I was encountering a problem with NullPointerExceptions with the deserialized > object reaching my ProcessFunction process() method implementation as a null > value. Upon investigation, I discovered two issues with the implementation of > the KryoSerializer copy(). > 1) The 'public T copy(T from)' method swallows the error if the kryo copy() > call generates an exception. The code should report the copy error at least > once as a warning to be aware that the kryo copy() is failing. I understand > that the code is there to handle the lack of a copy implementation but due to > the potential inefficiency of having to write and read the object instead of > copying it, this would seem useful information to share at the least. It is > also important to have a warning in case the cause of the copy error is > something that needs to be fixed. > 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the > fact that the kryo readObject(Input input, Class aClass) method may return a > null value if there are any issues. This could be handled with a check or > warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method > but is also ignored there, allowing a null value to be passed along without > providing any reason for the null value in logging. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system
[ https://issues.apache.org/jira/browse/FLINK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11868: --- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > [filesystems] Introduce listStatusIterator API to file system > - > > Key: FLINK-11868 > URL: https://issues.apache.org/jira/browse/FLINK-11868 > Project: Flink > Issue Type: New Feature > Components: FileSystems >Reporter: Yun Tang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > Fix For: 1.14.0 > > > From existed experience, we know {{listStatus}} is expensive for many > distributed file systems especially when the folder contains too many files. > This method would not only block the thread until result is return but also > could cause OOM due to the returned array of {{FileStatus}} is really large. > I think we should already learn it from FLINK-7266 and FLINK-8540. > However, list file status under a path is really helpful in many situations. > Thankfully, many distributed file system noticed that and provide API such as > {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}} > to call the file system on demand. > > We should also introduce this API and replace current implementation which > used previous {{listStatus}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange
[ https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-3769: -- Labels: auto-unassigned rabbitmq stale-major (was: auto-unassigned rabbitmq) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > RabbitMQ Sink ability to publish to a different exchange > > > Key: FLINK-3769 > URL: https://issues.apache.org/jira/browse/FLINK-3769 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.0.1 >Reporter: Robert Batts >Priority: Major > Labels: auto-unassigned, rabbitmq, stale-major > > The RabbitMQ Sink can currently only publish to the "default" exchange. This > exchange is a direct exchange, so the routing key will route directly to the > queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job > to 1 exchange which routes to 1 queue). Additionally, I believe that if a > user decides to use a different exchange I think the following can be assumed: > 1.) The provided exchange exists > 2.) The user has declared the appropriate mapping and the appropriate queues > exist in RabbitMQ (therefore, nothing needs to be created) > RabbitMQ currently provides four types of exchanges. Three of these will be > covered by just enabling exchanges (Direct, Fanout, Topic) because they use > the routingkey (or nothing). > The fourth exchange type relies on the message headers, which are currently > set to null by default on the publish. These headers may be on a per message > level, so the input of this stream will need to take this as input as well. > This forth exchange could very well be outside of the scope of this > Improvement and a "RabbitMQ Sink enable headers" Improvement might be the > better way to go with this. > Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22737) Add support for CURRENT_WATERMARK to SQL
[ https://issues.apache.org/jira/browse/FLINK-22737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-22737. Fix Version/s: 1.14.0 Resolution: Fixed Fixed in 1.14.0: 8bd215dc28ae431120e5b1a114a94ddc6e004b16 > Add support for CURRENT_WATERMARK to SQL > > > Key: FLINK-22737 > URL: https://issues.apache.org/jira/browse/FLINK-22737 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: David Anderson >Assignee: Ingo Bürk >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > Attachments: screenshot-2021-05-31_14-22-43.png > > > With a built-in function returning the current watermark, one could operate > on late events without resorting to using the DataStream API. > Called with zero parameters, this function returns the current watermark for > the current row – if there is an event time attribute. Otherwise, it returns > NULL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22948) Scala example for toDataStream does not compile
[ https://issues.apache.org/jira/browse/FLINK-22948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-22948: Assignee: Timo Walther > Scala example for toDataStream does not compile > --- > > Key: FLINK-22948 > URL: https://issues.apache.org/jira/browse/FLINK-22948 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Affects Versions: 1.13.1 >Reporter: David Anderson >Assignee: Timo Walther >Priority: Major > > The scala example at > [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-todatastream] > does not compile – {{User.class}} should be {{classOf[User]}}. > It would also be better to show the table DDL as > {{tableEnv.executeSql(}} > {{ """}} > {{ CREATE TABLE GeneratedTable (}} > {{ name STRING,}} > {{ score INT,}} > {{ event_time TIMESTAMP_LTZ(3),}} > {{ WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND}} > {{ )}} > {{ WITH ('connector'='datagen')}} > {{ """}} > {{)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23022) Plugin classloader settings do not work as described
Dawid Wysakowicz created FLINK-23022: Summary: Plugin classloader settings do not work as described Key: FLINK-23022 URL: https://issues.apache.org/jira/browse/FLINK-23022 Project: Flink Issue Type: Bug Components: API / Core Reporter: Dawid Wysakowicz The options {{plugin.classloader.parent-first-patterns}} are documented as all patterns that are put there should be loaded from the Flink classloader instead of the plugin classloader. However, the way they work is that they define a set of patterns allowed to be pulled from the parent classloader. The plugin classloader takes precedence in all cases. And a class can be loaded from the parent classloader only if it matches the pattern in one of the aforementioned options. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22948) Scala example for toDataStream does not compile
[ https://issues.apache.org/jira/browse/FLINK-22948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-22948. Fix Version/s: 1.13.2 1.14.0 Resolution: Fixed Fixed in 1.14.0: e23337aace8f02fb8b940480a4ce3c2f054f323e Fixed in 1.13.2: bf304e49c4e6ea7b8d73cfb7709e87a2e4c8b52a > Scala example for toDataStream does not compile > --- > > Key: FLINK-22948 > URL: https://issues.apache.org/jira/browse/FLINK-22948 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Affects Versions: 1.13.1 >Reporter: David Anderson >Assignee: Timo Walther >Priority: Major > Fix For: 1.14.0, 1.13.2 > > > The scala example at > [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-todatastream] > does not compile – {{User.class}} should be {{classOf[User]}}. > It would also be better to show the table DDL as > {{tableEnv.executeSql(}} > {{ """}} > {{ CREATE TABLE GeneratedTable (}} > {{ name STRING,}} > {{ score INT,}} > {{ event_time TIMESTAMP_LTZ(3),}} > {{ WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND}} > {{ )}} > {{ WITH ('connector'='datagen')}} > {{ """}} > {{)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API
[ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-23020: Component/s: (was: API / Python) Table SQL / Planner > NullPointerException when running collect twice from Python API > --- > > Key: FLINK-23020 > URL: https://issues.apache.org/jira/browse/FLINK-23020 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.1 >Reporter: Maciej Bryński >Priority: Major > > Hi, > I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in > following scenario. > 1. I'm creating datagen table. > {code:java} > from pyflink.table import EnvironmentSettings, TableEnvironment, > StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.common import Configuration, Row > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.java_gateway import get_gateway > conf = Configuration() > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > table_env.get_config().get_configuration().set_integer("parallelism.default", > 1) > table_env.execute_sql("DROP TABLE IF EXISTS datagen") > table_env.execute_sql(""" > CREATE TABLE datagen ( > id INT > ) WITH ( > 'connector' = 'datagen' > ) > """) > {code} > 2. Then I'm running collect > {code:java} > try: > result = table_env.sql_query("select * from datagen limit 1").execute() > for r in result.collect(): > print(r) > except KeyboardInterrupt: > result.get_job_client().cancel() > {code} > 3. I'm using "interrupt the kernel" button. This is handled by above > try/except and will cancel the query. > 4. I'm running collect from point 2 one more time. Result: > {code:java} > --- > Py4JJavaError Traceback (most recent call last) > in > 1 try: > > 2 result = table_env.sql_query("select * from datagen limit > 1").execute() > 3 for r in result.collect(): > 4 print(r) > 5 except KeyboardInterrupt: > /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) >1070 """ >1071 self._t_env._before_execute() > -> 1072 return TableResult(self._j_table.execute()) >1073 >1074 def explain(self, *extra_details: ExplainDetail) -> str: > /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, > *args) >1283 >1284 answer = self.gateway_client.send_command(command) > -> 1285 return_value = get_return_value( >1286 answer, self.gateway_client, self.target_id, self.name) >1287 > /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, > **kw) > 144 def deco(*a, **kw): > 145 try: > --> 146 return f(*a, **kw) > 147 except Py4JJavaError as e: > 148 from pyflink.java_gateway import get_gateway > /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) > 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 325 if answer[1] == REFERENCE_TYPE: > --> 326 raise Py4JJavaError( > 327 "An error occurred while calling {0}{1}{2}.\n". > 328 format(target_id, ".", name), value) > Py4JJavaError: An error occurred while calling o69.execute. > : java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) > at > org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) > at > org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) > at > org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) > at > org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858) >
[jira] [Commented] (FLINK-23020) NullPointerException when running collect twice from Python API
[ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364896#comment-17364896 ] Dian Fu commented on FLINK-23020: - This is more like an issue of the underlying planner. cc [~godfreyhe] > NullPointerException when running collect twice from Python API > --- > > Key: FLINK-23020 > URL: https://issues.apache.org/jira/browse/FLINK-23020 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.1 >Reporter: Maciej Bryński >Priority: Major > > Hi, > I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in > following scenario. > 1. I'm creating datagen table. > {code:java} > from pyflink.table import EnvironmentSettings, TableEnvironment, > StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.common import Configuration, Row > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.java_gateway import get_gateway > conf = Configuration() > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > table_env.get_config().get_configuration().set_integer("parallelism.default", > 1) > table_env.execute_sql("DROP TABLE IF EXISTS datagen") > table_env.execute_sql(""" > CREATE TABLE datagen ( > id INT > ) WITH ( > 'connector' = 'datagen' > ) > """) > {code} > 2. Then I'm running collect > {code:java} > try: > result = table_env.sql_query("select * from datagen limit 1").execute() > for r in result.collect(): > print(r) > except KeyboardInterrupt: > result.get_job_client().cancel() > {code} > 3. I'm using "interrupt the kernel" button. This is handled by above > try/except and will cancel the query. > 4. I'm running collect from point 2 one more time. Result: > {code:java} > --- > Py4JJavaError Traceback (most recent call last) > in > 1 try: > > 2 result = table_env.sql_query("select * from datagen limit > 1").execute() > 3 for r in result.collect(): > 4 print(r) > 5 except KeyboardInterrupt: > /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) >1070 """ >1071 self._t_env._before_execute() > -> 1072 return TableResult(self._j_table.execute()) >1073 >1074 def explain(self, *extra_details: ExplainDetail) -> str: > /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, > *args) >1283 >1284 answer = self.gateway_client.send_command(command) > -> 1285 return_value = get_return_value( >1286 answer, self.gateway_client, self.target_id, self.name) >1287 > /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, > **kw) > 144 def deco(*a, **kw): > 145 try: > --> 146 return f(*a, **kw) > 147 except Py4JJavaError as e: > 148 from pyflink.java_gateway import get_gateway > /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) > 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 325 if answer[1] == REFERENCE_TYPE: > --> 326 raise Py4JJavaError( > 327 "An error occurred while calling {0}{1}{2}.\n". > 328 format(target_id, ".", name), value) > Py4JJavaError: An error occurred while calling o69.execute. > : java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) > at > org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) > at > org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) > at > org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) > at > org.apache.calcite.plan.hep.HepPlan
[jira] [Commented] (FLINK-22587) Support aggregations in batch mode with DataStream API
[ https://issues.apache.org/jira/browse/FLINK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364897#comment-17364897 ] Etienne Chauchot commented on FLINK-22587: -- [~sjwiesman] thanks for the pointer, I manually coded an inner join using _KeyedCoProcessFunction_ and states and it works correctly. I'm sending a mail to the ML in case it is useful to some users > Support aggregations in batch mode with DataStream API > -- > > Key: FLINK-22587 > URL: https://issues.apache.org/jira/browse/FLINK-22587 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.12.0, 1.13.0 >Reporter: Etienne Chauchot >Priority: Major > > A pipeline like this *in batch mode* would output no data > {code:java} > stream.join(otherStream) > .where() > .equalTo() > .window(GlobalWindows.create()) > .apply() > {code} > Indeed the default trigger for GlobalWindow is NeverTrigger which never > fires. If we set a _EventTimeTrigger_ it will fire with every element as the > watermark will be set to +INF (batch mode) and will pass the end of the > global window with each new element. A _ProcessingTimeTrigger_ never fires > either and all elapsed time or delta based triggers would not be suited for > batch. > Same goes for _reduce()_ instead of join(). > So I guess we miss something for batch support with DataStream. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22682) Checkpoint interval too large for higher DOP
[ https://issues.apache.org/jira/browse/FLINK-22682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-22682. -- Resolution: Not A Bug This is not a bug - better logging merged to master as described above. > Checkpoint interval too large for higher DOP > > > Key: FLINK-22682 > URL: https://issues.apache.org/jira/browse/FLINK-22682 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.0, 1.13.1 >Reporter: Arvid Heise >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > When running a job on EMR with DOP of 160, checkpoints are not triggered at > the set interval. I used an interval of 10s and the average checkpoint took > <10s, so I'd expect ~6 checkpoints per minute. > However, the actual completion message was heavily delayed. I had > backpressure in this test and used unaligned checkpoints. > {noformat} > 2021-05-14 10:06:39,182 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 1 for job 58df7eb721aaefbfb08168b2c3fd6717 (8940061335 bytes in > 9097 ms). > 2021-05-14 10:06:39,205 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 1 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:06:40,223 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 2 (type=CHECKPOINT) @ 1620986800082 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:07:49,263 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 2 for job 58df7eb721aaefbfb08168b2c3fd6717 (8286704522 bytes in > 6490 ms). > 2021-05-14 10:07:49,281 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 2 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:07:49,443 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 3 (type=CHECKPOINT) @ 1620986869281 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:08:55,679 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 3 for job 58df7eb721aaefbfb08168b2c3fd6717 (7398457668 bytes in > 6182 ms). > 2021-05-14 10:08:55,694 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 3 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:08:55,820 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 4 (type=CHECKPOINT) @ 1620986935694 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:09:58,024 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 4 for job 58df7eb721aaefbfb08168b2c3fd6717 (7402199053 bytes in > 6179 ms). > 2021-05-14 10:09:58,035 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 4 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:09:58,154 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 5 (type=CHECKPOINT) @ 1620986998035 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:11:02,694 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 5 for job 58df7eb721aaefbfb08168b2c3fd6717 (7395692776 bytes in > 6788 ms). > 2021-05-14 10:11:02,705 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 5 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:11:02,830 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 6 (type=CHECKPOINT) @ 1620987062705 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:12:05,043 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 6 for job 58df7eb721aaefbfb08168b2c3fd6717 (7388207757 bytes in > 6025 ms). > 2021-05-14 10:12:05,054 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 6 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:12:05,182 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 7 (type=CHECKPOINT) @ 1620987125054 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:13:04,754 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 7 for job 58df7eb721aaefbfb08168b2c3fd6717 (7360227162 bytes in > 6469 ms). > 2021-05-14 10:13:04,779 INFO > org.apache.flink.runtime.source.coord
[jira] [Comment Edited] (FLINK-22682) Checkpoint interval too large for higher DOP
[ https://issues.apache.org/jira/browse/FLINK-22682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364900#comment-17364900 ] Piotr Nowojski edited comment on FLINK-22682 at 6/17/21, 11:58 AM: --- Thanks for taking care of this [~akalashnikov] and [~roman_khachatryan]. Close this as not a bug - better logging merged to master as described above. was (Author: pnowojski): This is not a bug - better logging merged to master as described above. > Checkpoint interval too large for higher DOP > > > Key: FLINK-22682 > URL: https://issues.apache.org/jira/browse/FLINK-22682 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.0, 1.13.1 >Reporter: Arvid Heise >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > When running a job on EMR with DOP of 160, checkpoints are not triggered at > the set interval. I used an interval of 10s and the average checkpoint took > <10s, so I'd expect ~6 checkpoints per minute. > However, the actual completion message was heavily delayed. I had > backpressure in this test and used unaligned checkpoints. > {noformat} > 2021-05-14 10:06:39,182 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 1 for job 58df7eb721aaefbfb08168b2c3fd6717 (8940061335 bytes in > 9097 ms). > 2021-05-14 10:06:39,205 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 1 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:06:40,223 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 2 (type=CHECKPOINT) @ 1620986800082 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:07:49,263 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 2 for job 58df7eb721aaefbfb08168b2c3fd6717 (8286704522 bytes in > 6490 ms). > 2021-05-14 10:07:49,281 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 2 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:07:49,443 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 3 (type=CHECKPOINT) @ 1620986869281 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:08:55,679 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 3 for job 58df7eb721aaefbfb08168b2c3fd6717 (7398457668 bytes in > 6182 ms). > 2021-05-14 10:08:55,694 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 3 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:08:55,820 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 4 (type=CHECKPOINT) @ 1620986935694 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:09:58,024 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 4 for job 58df7eb721aaefbfb08168b2c3fd6717 (7402199053 bytes in > 6179 ms). > 2021-05-14 10:09:58,035 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 4 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:09:58,154 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 5 (type=CHECKPOINT) @ 1620986998035 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:11:02,694 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 5 for job 58df7eb721aaefbfb08168b2c3fd6717 (7395692776 bytes in > 6788 ms). > 2021-05-14 10:11:02,705 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 5 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:11:02,830 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 6 (type=CHECKPOINT) @ 1620987062705 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:12:05,043 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 6 for job 58df7eb721aaefbfb08168b2c3fd6717 (7388207757 bytes in > 6025 ms). > 2021-05-14 10:12:05,054 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 6 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:12:05,182 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering > checkpoint 7 (type=CHECKPOINT) @ 1620987125054 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05
[jira] [Updated] (FLINK-21952) Make all the "Connection reset by peer" exception wrapped as RemoteTransportException
[ https://issues.apache.org/jira/browse/FLINK-21952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-21952: --- Priority: Minor (was: Major) > Make all the "Connection reset by peer" exception wrapped as > RemoteTransportException > - > > Key: FLINK-21952 > URL: https://issues.apache.org/jira/browse/FLINK-21952 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.0, 1.13.1, 1.12.4 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Minor > Labels: auto-deprioritized-major, stale-assigned > > In CreditBasedPartitionRequestClientHandler#exceptionCaught, the IOException > or the exception with exact message "Connection reset by peer" are marked as > RemoteTransportException. > However, with the current Netty implementation, sometimes it might throw > {code:java} > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > readAddress(..) failed: Connection reset by peer > {code} > in some case. It would be also wrapped as LocalTransportException, which > might cause some confusion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21952) Make all the "Connection reset by peer" exception wrapped as RemoteTransportException
[ https://issues.apache.org/jira/browse/FLINK-21952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-21952: --- Affects Version/s: 1.14.0 1.13.1 1.12.4 > Make all the "Connection reset by peer" exception wrapped as > RemoteTransportException > - > > Key: FLINK-21952 > URL: https://issues.apache.org/jira/browse/FLINK-21952 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.0, 1.13.1, 1.12.4 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > Labels: auto-deprioritized-major, stale-assigned > > In CreditBasedPartitionRequestClientHandler#exceptionCaught, the IOException > or the exception with exact message "Connection reset by peer" are marked as > RemoteTransportException. > However, with the current Netty implementation, sometimes it might throw > {code:java} > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > readAddress(..) failed: Connection reset by peer > {code} > in some case. It would be also wrapped as LocalTransportException, which > might cause some confusion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22367) JobMasterStopWithSavepointITCase.terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished times out
[ https://issues.apache.org/jira/browse/FLINK-22367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-22367: --- Priority: Minor (was: Major) > JobMasterStopWithSavepointITCase.terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished > times out > -- > > Key: FLINK-22367 > URL: https://issues.apache.org/jira/browse/FLINK-22367 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Priority: Minor > Labels: auto-deprioritized-critical, auto-unassigned, > test-stability > Fix For: 1.14.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16818&view=logs&j=2c3cbe13-dee0-5837-cf47-3053da9a8a78&t=2c7d57b9-7341-5a87-c9af-2cf7cc1a37dc&l=3844 > {code} > [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: > 13.135 s <<< FAILURE! - in > org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase > Apr 19 22:28:44 [ERROR] > terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished(org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase) > Time elapsed: 10.237 s <<< ERROR! > Apr 19 22:28:44 java.util.concurrent.ExecutionException: > java.util.concurrent.TimeoutException: Invocation of public default > java.util.concurrent.CompletableFuture > org.apache.flink.runtime.webmonitor.RestfulGateway.stopWithSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time) > timed out. > Apr 19 22:28:44 at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > Apr 19 22:28:44 at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > Apr 19 22:28:44 at > org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.stopWithSavepointNormalExecutionHelper(JobMasterStopWithSavepointITCase.java:123) > Apr 19 22:28:44 at > org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished(JobMasterStopWithSavepointITCase.java:111) > Apr 19 22:28:44 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > Apr 19 22:28:44 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Apr 19 22:28:44 at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Apr 19 22:28:44 at > java.base/java.lang.reflect.Method.invoke(Method.java:566) > Apr 19 22:28:44 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > Apr 19 22:28:44 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Apr 19 22:28:44 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > Apr 19 22:28:44 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Apr 19 22:28:44 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Apr 19 22:28:44 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > Apr 19 22:28:44 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Apr 19 22:28:44 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > Apr 19 22:28:44 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Apr 19 22:28:44 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > Apr 19 22:28:44 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > Apr 19 22:28:44 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > Apr 19 22:28:44 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > Apr 19 22:28:44 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > Apr 19 22:28:44 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > Apr 19 22:28:44 at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > Apr 19 22:28:44 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > Apr 19 22:28:44 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > Apr 19 22:28:44 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > Apr 19 22:28:44 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Apr 19 22:28:44 at > org.junit.runners.ParentRunner.run(ParentR
[jira] [Reopened] (FLINK-19537) Processed in-flight bytes metric is not accurate
[ https://issues.apache.org/jira/browse/FLINK-19537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reopened FLINK-19537: > Processed in-flight bytes metric is not accurate > > > Key: FLINK-19537 > URL: https://issues.apache.org/jira/browse/FLINK-19537 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Piotr Nowojski >Priority: Minor > Labels: auto-closed > > Processed in-flight bytes as introduced in FLINK-18662 is not entirely > accurate, as it's ignoring the buffer/bytes accumulated in the record > deserializers. If buffer is processed here, it doesn't mean it was fully > processed (so we can over estimate the amount of processed bytes). On the > other hand some records/bytes might be processed without polling anything > from this {{CheckpointedInputGate}} (underestimating the amount of processed > bytes). All in all this should have been calculated on the > {{StreamTaskNetworkInput}} level, where we have an access to the records > deserializers. However the current is on average accurate and it might be > just good enough (at least for the time being). > Also this metric is currently ignoring chained source inputs to the multiple > input stream task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (FLINK-17477) resumeConsumption call should happen as quickly as possible to minimise latency
[ https://issues.apache.org/jira/browse/FLINK-17477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reopened FLINK-17477: > resumeConsumption call should happen as quickly as possible to minimise > latency > --- > > Key: FLINK-17477 > URL: https://issues.apache.org/jira/browse/FLINK-17477 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.11.0, 1.12.0 >Reporter: Piotr Nowojski >Priority: Minor > Labels: auto-closed > > We should be calling {{InputGate#resumeConsumption()}} as soon as possible > (to avoid any unnecessary delay/latency when task is idling). Currently I > think it’s mostly fine - the important bit is that on the happy path, we > always {{resumeConsumption}} before trying to complete the checkpoint, so > that netty threads will start resuming the network traffic while the task > thread is doing the synchronous part of the checkpoint and starting > asynchronous part. But I think in two places we are first aborting checkpoint > and only then resuming consumption (in {{CheckpointBarrierAligner}}): > {code} > // let the task know we are not completing this > notifyAbort(currentCheckpointId, > new CheckpointException( > "Barrier id: " + barrierId, > CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)); > // abort the current checkpoint > releaseBlocksAndResetBarriers(); > {code} > {code} > // let the task know we skip a checkpoint > notifyAbort(currentCheckpointId, > new > CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); > // no chance to complete this checkpoint > releaseBlocksAndResetBarriers(); > {code} > It’s not a big deal, as those are a rare conditions, but it would be better > to be consistent everywhere: first release blocks and resume consumption, > before anything else happens. -- This message was sent by Atlassian Jira (v8.3.4#803005)