[jira] [Closed] (FLINK-23006) issue in configuring History server

2021-06-17 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-23006.
-
Resolution: Not A Problem

> issue in configuring History server 
> 
>
> Key: FLINK-23006
> URL: https://issues.apache.org/jira/browse/FLINK-23006
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.1
>Reporter: Bhagi
>Priority: Minor
> Attachments: flink-history-server-7fb6f9b66-bkhfm.log, 
> history-server-deployment.yaml
>
>
> Dear Team,
> I have flink session cluster with HA on standalone kubernetes. I want to 
> configure History server for this flink cluster.. 
> to configure the history server, in the Flink cluster name space i created 1 
> more deployment for  history server by passing args: ["history-server"], but 
> pod is in carshloopBackoff.
> can you see the logs of this and help me to configure history server.
> kubectl get po -n flink-load-test
> NAME   READY   STATUS RESTARTS   
> AGE
> flink-history-server-7fb6f9b66-bkhfm   0/1 CrashLoopBackOff   10 
> 29m
> flink-jobmanager-7df744b455-f575q  1/1 Running0  
> 18h
> flink-jobmanager-7df744b455-qm6wf  1/1 Running0  
> 18h
> flink-jobmanager-7df744b455-t7zm5  1/1 Running1  
> 18h
> flink-taskmanager-7945f6b856-89mr5 1/1 Running7  
> 18h
> flink-taskmanager-7945f6b856-mz2hz 1/1 Running7  
> 18h



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23006) issue in configuring History server

2021-06-17 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-23006:
--
Priority: Minor  (was: Blocker)

> issue in configuring History server 
> 
>
> Key: FLINK-23006
> URL: https://issues.apache.org/jira/browse/FLINK-23006
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.1
>Reporter: Bhagi
>Priority: Minor
> Attachments: flink-history-server-7fb6f9b66-bkhfm.log, 
> history-server-deployment.yaml
>
>
> Dear Team,
> I have flink session cluster with HA on standalone kubernetes. I want to 
> configure History server for this flink cluster.. 
> to configure the history server, in the Flink cluster name space i created 1 
> more deployment for  history server by passing args: ["history-server"], but 
> pod is in carshloopBackoff.
> can you see the logs of this and help me to configure history server.
> kubectl get po -n flink-load-test
> NAME   READY   STATUS RESTARTS   
> AGE
> flink-history-server-7fb6f9b66-bkhfm   0/1 CrashLoopBackOff   10 
> 29m
> flink-jobmanager-7df744b455-f575q  1/1 Running0  
> 18h
> flink-jobmanager-7df744b455-qm6wf  1/1 Running0  
> 18h
> flink-jobmanager-7df744b455-t7zm5  1/1 Running1  
> 18h
> flink-taskmanager-7945f6b856-89mr5 1/1 Running7  
> 18h
> flink-taskmanager-7945f6b856-mz2hz 1/1 Running7  
> 18h



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23006) issue in configuring History server

2021-06-17 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364730#comment-17364730
 ] 

Till Rohrmann commented on FLINK-23006:
---

Are you sure that {{historyserver.web.address: http://flinkhistory:8082}} can 
be resolved from within the pod? You should rather bind the history server to 
{{0.0.0.0}}.

A general note: Please do not open blocker issues for user questions. Almost 
all of your tickets are questions about using Flink and not real problems. Most 
of them even seem to come from not properly understanding Kubernetes. Please 
ask these questions on the user mailing list instead.

> issue in configuring History server 
> 
>
> Key: FLINK-23006
> URL: https://issues.apache.org/jira/browse/FLINK-23006
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.1
>Reporter: Bhagi
>Priority: Blocker
> Attachments: flink-history-server-7fb6f9b66-bkhfm.log, 
> history-server-deployment.yaml
>
>
> Dear Team,
> I have flink session cluster with HA on standalone kubernetes. I want to 
> configure History server for this flink cluster.. 
> to configure the history server, in the Flink cluster name space i created 1 
> more deployment for  history server by passing args: ["history-server"], but 
> pod is in carshloopBackoff.
> can you see the logs of this and help me to configure history server.
> kubectl get po -n flink-load-test
> NAME   READY   STATUS RESTARTS   
> AGE
> flink-history-server-7fb6f9b66-bkhfm   0/1 CrashLoopBackOff   10 
> 29m
> flink-jobmanager-7df744b455-f575q  1/1 Running0  
> 18h
> flink-jobmanager-7df744b455-qm6wf  1/1 Running0  
> 18h
> flink-jobmanager-7df744b455-t7zm5  1/1 Running1  
> 18h
> flink-taskmanager-7945f6b856-89mr5 1/1 Running7  
> 18h
> flink-taskmanager-7945f6b856-mz2hz 1/1 Running7  
> 18h



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description

2021-06-17 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-22970:
---

Assignee: Wei-Che Wei

> The documentation for `TO_TIMESTAMP` UDF has an incorrect description
> -
>
> Key: FLINK-22970
> URL: https://issues.apache.org/jira/browse/FLINK-22970
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
>  Labels: pull-request-available
>
> According to this ML discussion 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]
> The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
> timezone instead of session timezone. We should fix this documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership

2021-06-17 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364733#comment-17364733
 ] 

Till Rohrmann commented on FLINK-22483:
---

Where else is {{CompletedCheckpointStore::recover}} being called from? I think 
it is only called by the {{CheckpointCoordinator}} atm.

> Recover checkpoints when JobMaster gains leadership
> ---
>
> Key: FLINK-22483
> URL: https://issues.apache.org/jira/browse/FLINK-22483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially 
> long-lasting/blocking operation, for example if the file system 
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from 
> the main thread of the JobManager, making it unresponsive to any RPC call 
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job XXX 
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused 
> (Connection refused)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) 
> ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) 
> ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) 
> ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
>  ~[?:?]
>   at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
>  ~[?:?]
>   at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
>  ~[?:?]
>   at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
> ~[?:1.8.0_282]
>   at XXX.recover(KubernetesHaCheckpointStore.java:69) 
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) 
> ~[?:1.8.0_282]
>   at 
> java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.ja

[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-06-17 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364734#comment-17364734
 ] 

Robert Metzger commented on FLINK-22257:


I'm proposing to extend the {{ConfigOption}} class with a ConfigOptionType 
(cluster, job), and show this tag in the table we generate for the 
documentation.

> Clarify Flink ConfigOptions Usage
> -
>
> Key: FLINK-22257
> URL: https://issues.apache.org/jira/browse/FLINK-22257
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Fabian Paul
>Priority: Minor
>
> For users, it is hard to determine which ConfigOptions are relevant for the 
> different stages of a Flink application.
> Beginning from the translation of the user program to the execution on the 
> cluster. In particular which options can be configured through the different 
> channels.
>  * Cluster configuration (i.e. flink-conf.yaml)
>  * Application configuration, code-based
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure

2021-06-17 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364736#comment-17364736
 ] 

Yangze Guo commented on FLINK-22891:


I'll take another look into it.

> FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
> 
>
> Key: FLINK-22891
> URL: https://issues.apache.org/jira/browse/FLINK-22891
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=05b74a19-4ee4-5036-c46f-ada307df6cf0&l=8660
> {code}
> Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 6.24 s <<< FAILURE! - in 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
> Jun 05 21:16:00 [ERROR] 
> testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase)
>   Time elapsed: 5.015 s  <<< ERROR!
> Jun 05 21:16:00 java.util.concurrent.TimeoutException
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262)
> Jun 05 21:16:00   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jun 05 21:16:00   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jun 05 21:16:00   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jun 05 21:16:00   at java.lang.reflect.Method.invoke(Method.java:498)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jun 05 21:16:00   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jun 05 21:16:00   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunn

[jira] [Updated] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description

2021-06-17 Thread Wei-Che Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-22970:

Affects Version/s: 1.14.0

> The documentation for `TO_TIMESTAMP` UDF has an incorrect description
> -
>
> Key: FLINK-22970
> URL: https://issues.apache.org/jira/browse/FLINK-22970
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
>  Labels: pull-request-available
>
> According to this ML discussion 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]
> The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
> timezone instead of session timezone. We should fix this documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23003) Resource leak in RocksIncrementalSnapshotStrategy

2021-06-17 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364740#comment-17364740
 ] 

Roman Khachatryan commented on FLINK-23003:
---

Merged into master as 8be1058a60565587b465a2237136d4c168f3.

It should also be ported to 1.13 according to the [Update 
Policy|https://flink.apache.org/downloads.html#update-policy-for-old-releases].

[~Yanfei Lei] would you like to open a PR to backport it to 1.13?

> Resource leak in RocksIncrementalSnapshotStrategy
> -
>
> Key: FLINK-23003
> URL: https://issues.apache.org/jira/browse/FLINK-23003
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: Flink: 1.14-SNAPSHOT
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> We found that `RocksDBStateUploader` in `RocksIncrementalSnapshotStrategy` is 
> not closed correctly after being used. It would lead to a resource leak.
> `RocksDBStateUploader` inherits `RocksDBStateDataTransfer`, and 
> `RocksDBStateDataTransfer` holds an `ExecutorService`. `RocksDBStateUploader` 
> uses the `ExecutorService` to upload files to DFS asynchronously.
> When `RocksDBKeyedStateBackend` is cleaned up, all resources held by the 
> backend should be closed, but now `RocksIncrementalSnapshotStrategy` lacks a 
> close() function.
> And we encountered an example caused by this problem. When we benchmarked the 
> performance of incremental rescaling, we observed that the forked VM of JMH 
> can't exit normally.
> {code:java}
> [INFO]
> [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ benchmark ---
> # JMH version: 1.19
> # VM version: JDK 1.8.0_281, VM 25.281-b09
> # VM invoker: /home/leiyanfei.lyf/jdk1.8.0_281/jre/bin/java
> # VM options: -Djava.rmi.server.hostname=127.0.0.1 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
> # Warmup: 10 iterations, 1 s each
> # Measurement: 10 iterations, 1 s each
> # Timeout: 10 min per iteration
> # Threads: 1 thread, will synchronize iterations
> # Benchmark mode: Average time, time/op
> # Benchmark: 
> org.apache.flink.state.RocksIncrementalCheckpointScaleUpBenchmark.ScalingUp
> # Parameters: (numberElements = 100, parallelism1 = 2, parallelism2 = 3)# Run 
> progress: 0.00% complete, ETA 00:02:00
> # Fork: 1 of 3
> # Warmup Iteration   1: 244.717 ms/op
> # Warmup Iteration   2: 104.749 ms/op
> # Warmup Iteration   3: 104.182 ms/op
> ...
> Iteration   1: 96.600 ms/op
> Iteration   2: 108.463 ms/op
> Iteration   3: 93.657 ms/op
> ... threads? Waiting 24 seconds more...>Non-finished threads:
> ...
> Thread[pool-15-thread-4,5,main]
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>   at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> The root cause of this example is that the `{{RocksDBStateUploader}}` in 
> `{{RocksIncrementalSnapshotStrategy`}} is not closed normally when 
> `{{RocksDBKeyedStateBackend`}} is disposed.
>  
> The solution to this problem is quite straightforward, 
> `{{RocksDBStateUploader`}} in `{{RocksIncrementalSnapshotStrategy}}` can be 
> closed when cleaning up `{{RocksDBKeyedStateBackend}}`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure

2021-06-17 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-22891:


Assignee: Yangze Guo

> FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
> 
>
> Key: FLINK-22891
> URL: https://issues.apache.org/jira/browse/FLINK-22891
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Dawid Wysakowicz
>Assignee: Yangze Guo
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=05b74a19-4ee4-5036-c46f-ada307df6cf0&l=8660
> {code}
> Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 6.24 s <<< FAILURE! - in 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
> Jun 05 21:16:00 [ERROR] 
> testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase)
>   Time elapsed: 5.015 s  <<< ERROR!
> Jun 05 21:16:00 java.util.concurrent.TimeoutException
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262)
> Jun 05 21:16:00   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jun 05 21:16:00   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jun 05 21:16:00   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jun 05 21:16:00   at java.lang.reflect.Method.invoke(Method.java:498)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jun 05 21:16:00   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jun 05 21:16:00   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(Parent

[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-06-17 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364745#comment-17364745
 ] 

Timo Walther commented on FLINK-22257:
--

We should include [~AHeise] here as well. This is clearly also an SDK topic 
that we've started in 2019 but never completed. The prefixes of option names 
(e.g. `pipeline.`) mostly represent the proposed design but allowing to define 
additional areas where options apply programmatically is a good idea.

> Clarify Flink ConfigOptions Usage
> -
>
> Key: FLINK-22257
> URL: https://issues.apache.org/jira/browse/FLINK-22257
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Fabian Paul
>Priority: Minor
>
> For users, it is hard to determine which ConfigOptions are relevant for the 
> different stages of a Flink application.
> Beginning from the translation of the user program to the execution on the 
> cluster. In particular which options can be configured through the different 
> channels.
>  * Cluster configuration (i.e. flink-conf.yaml)
>  * Application configuration, code-based
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22995) Failed to acquire lease 'ConfigMapLock: .... retrying...

2021-06-17 Thread Bhagi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364746#comment-17364746
 ] 

Bhagi commented on FLINK-22995:
---

Hi Till,

I have 2 flink clusters in different namespace..but they have unique 
kubernetes.cluster-Id

> Failed to acquire lease 'ConfigMapLock:  retrying...
> 
>
> Key: FLINK-22995
> URL: https://issues.apache.org/jira/browse/FLINK-22995
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.1
>Reporter: Bhagi
>Priority: Major
> Attachments: jobamanger.log
>
>
> Hi Team,
> I have deployed Flink session cluster on standalone k8s with Jobmanager HA 
> (k8s HA service).
> when i am submitting the jobs all jobs are failed. due to jobmanager leader 
> election issue.
> attaching the logs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership

2021-06-17 Thread Eduardo Winpenny Tejedor (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364747#comment-17364747
 ] 

Eduardo Winpenny Tejedor commented on FLINK-22483:
--

[~trohrmann] that's right but the {{CheckpointCoordinator}} gets called from 
multiple places, and those also get called from multiple other places. They all 
end up invoking {{CompletedCheckpointStore::recover}} in the stack call. Should 
those stay unchanged?

> Recover checkpoints when JobMaster gains leadership
> ---
>
> Key: FLINK-22483
> URL: https://issues.apache.org/jira/browse/FLINK-22483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially 
> long-lasting/blocking operation, for example if the file system 
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from 
> the main thread of the JobManager, making it unresponsive to any RPC call 
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job XXX 
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused 
> (Connection refused)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) 
> ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) 
> ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) 
> ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
>  ~[?:?]
>   at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
>  ~[?:?]
>   at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
>  ~[?:?]
>   at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
> ~[?:1.8.0_282]
>   at XXX.recover(KubernetesHaCheckpointStore.java:69) 
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> java.util.concurrent.Completable

[jira] [Reopened] (FLINK-22968) Improve exception message when using toAppendStream[String]

2021-06-17 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reopened FLINK-22968:
--

> Improve exception message when using toAppendStream[String]
> ---
>
> Key: FLINK-22968
> URL: https://issues.apache.org/jira/browse/FLINK-22968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.3, 1.13.1
> Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color}
>Reporter: DaChun
>Assignee: Nicholas Jiang
>Priority: Major
> Attachments: test.scala, this_error.txt
>
>
> {code:scala}
> package com.bytedance.one
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> object test {
>   def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment
>   .createLocalEnvironmentWithWebUI()
> val stream: DataStream[String] = env.readTextFile("data/wc.txt")
> val tableEnvironment: StreamTableEnvironment = 
> StreamTableEnvironment.create(env)
> val table: Table = tableEnvironment.fromDataStream(stream)
> tableEnvironment.createTemporaryView("wc", table)
> val res: Table = tableEnvironment.sqlQuery("select * from wc")
> tableEnvironment.toAppendStream[String](res).print()
> env.execute("test")
>   }
> }
> {code}
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
> h3. Summary
> We should improve the exception message a bit, we can throw the given type 
> (String) is not allowed in {{toAppendStream}}.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22968) Improve exception message when using toAppendStream[String]

2021-06-17 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-22968.

Resolution: Won't Fix

Let's not put more effort in deprecated APIs. 

> Improve exception message when using toAppendStream[String]
> ---
>
> Key: FLINK-22968
> URL: https://issues.apache.org/jira/browse/FLINK-22968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.3, 1.13.1
> Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color}
>Reporter: DaChun
>Assignee: Nicholas Jiang
>Priority: Major
> Attachments: test.scala, this_error.txt
>
>
> {code:scala}
> package com.bytedance.one
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> object test {
>   def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment
>   .createLocalEnvironmentWithWebUI()
> val stream: DataStream[String] = env.readTextFile("data/wc.txt")
> val tableEnvironment: StreamTableEnvironment = 
> StreamTableEnvironment.create(env)
> val table: Table = tableEnvironment.fromDataStream(stream)
> tableEnvironment.createTemporaryView("wc", table)
> val res: Table = tableEnvironment.sqlQuery("select * from wc")
> tableEnvironment.toAppendStream[String](res).print()
> env.execute("test")
>   }
> }
> {code}
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
> h3. Summary
> We should improve the exception message a bit, we can throw the given type 
> (String) is not allowed in {{toAppendStream}}.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22968) Improve exception message when using toAppendStream[String]

2021-06-17 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-22968:
-
Comment: was deleted

(was: Let's not put more effort in deprecated APIs. )

> Improve exception message when using toAppendStream[String]
> ---
>
> Key: FLINK-22968
> URL: https://issues.apache.org/jira/browse/FLINK-22968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.3, 1.13.1
> Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color}
>Reporter: DaChun
>Assignee: Nicholas Jiang
>Priority: Major
> Attachments: test.scala, this_error.txt
>
>
> {code:scala}
> package com.bytedance.one
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> object test {
>   def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment
>   .createLocalEnvironmentWithWebUI()
> val stream: DataStream[String] = env.readTextFile("data/wc.txt")
> val tableEnvironment: StreamTableEnvironment = 
> StreamTableEnvironment.create(env)
> val table: Table = tableEnvironment.fromDataStream(stream)
> tableEnvironment.createTemporaryView("wc", table)
> val res: Table = tableEnvironment.sqlQuery("select * from wc")
> tableEnvironment.toAppendStream[String](res).print()
> env.execute("test")
>   }
> }
> {code}
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
> h3. Summary
> We should improve the exception message a bit, we can throw the given type 
> (String) is not allowed in {{toAppendStream}}.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22326) Job contains Iterate Operator always fails on Checkpoint

2021-06-17 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364750#comment-17364750
 ] 

Dawid Wysakowicz commented on FLINK-22326:
--

I think updating the documentation is a good idea. If you like to help with 
that I can review the change.

{{tryYield}} is not a good solution as well, as it basically leads to busy 
looping. We'd have to change the queue to one that gives us a Future, which 
tells if a record is available. That way we could suspend the default action of 
the mailbox processor.

> Job contains Iterate Operator always fails on Checkpoint 
> -
>
> Key: FLINK-22326
> URL: https://issues.apache.org/jira/browse/FLINK-22326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.1
>Reporter: Lu Niu
>Assignee: Lu Niu
>Priority: Major
>  Labels: pull-request-available
> Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot 
> 2021-04-16 at 12.43.38 PM.png
>
>
> Job contains Iterate Operator will always fail on checkpoint.
> How to reproduce: 
> [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785]
> this is based on 
> [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,]
>  but a few line difference:
>  1. Make maxWaitTime large enough when create IterativeStream
> 2. No output back to Itertive Source
> Result:
> The same code is able to checkpoint in 1.9.1
> !Screen Shot 2021-04-16 at 12.43.38 PM.png!
>  
> but always fail on checkpoint in 1.11
> !Screen Shot 2021-04-16 at 12.40.34 PM.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22968) Improve exception message when using toAppendStream[String]

2021-06-17 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-22968.

Resolution: Won't Fix

Let's not put more effort into deprecated APIs. The 1.13 documentation clearly 
states that these API parts are legacy. I will put a deprecation over the 
affected methods in 1.14. If this pops up again, we can think about fixing it. 

> Improve exception message when using toAppendStream[String]
> ---
>
> Key: FLINK-22968
> URL: https://issues.apache.org/jira/browse/FLINK-22968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.3, 1.13.1
> Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color}
>Reporter: DaChun
>Assignee: Nicholas Jiang
>Priority: Major
> Attachments: test.scala, this_error.txt
>
>
> {code:scala}
> package com.bytedance.one
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> object test {
>   def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment
>   .createLocalEnvironmentWithWebUI()
> val stream: DataStream[String] = env.readTextFile("data/wc.txt")
> val tableEnvironment: StreamTableEnvironment = 
> StreamTableEnvironment.create(env)
> val table: Table = tableEnvironment.fromDataStream(stream)
> tableEnvironment.createTemporaryView("wc", table)
> val res: Table = tableEnvironment.sqlQuery("select * from wc")
> tableEnvironment.toAppendStream[String](res).print()
> env.execute("test")
>   }
> }
> {code}
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
> h3. Summary
> We should improve the exception message a bit, we can throw the given type 
> (String) is not allowed in {{toAppendStream}}.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22326) Job contains Iterate Operator always fails on Checkpoint

2021-06-17 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364750#comment-17364750
 ] 

Dawid Wysakowicz edited comment on FLINK-22326 at 6/17/21, 7:39 AM:


I think updating the documentation is a good idea. If you like to help with 
that I can review the change.

{{tryYield}} is not a good solution as well, as it basically leads to busy 
looping. We'd have to change the queue to one that gives us a Future, which 
tells if a record is available. That way we could suspend the default action of 
the mailbox processor.

{{Yield}} in the {{AsyncWaitOperator}} is correct imo, as the only way to put 
new records into the queue is from processing records via the 
{{mailboxProcessor}}. In other words there must be a new mail in the mailbox to 
unblock the queue.


was (Author: dawidwys):
I think updating the documentation is a good idea. If you like to help with 
that I can review the change.

{{tryYield}} is not a good solution as well, as it basically leads to busy 
looping. We'd have to change the queue to one that gives us a Future, which 
tells if a record is available. That way we could suspend the default action of 
the mailbox processor.

> Job contains Iterate Operator always fails on Checkpoint 
> -
>
> Key: FLINK-22326
> URL: https://issues.apache.org/jira/browse/FLINK-22326
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.1
>Reporter: Lu Niu
>Assignee: Lu Niu
>Priority: Major
>  Labels: pull-request-available
> Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot 
> 2021-04-16 at 12.43.38 PM.png
>
>
> Job contains Iterate Operator will always fail on checkpoint.
> How to reproduce: 
> [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785]
> this is based on 
> [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,]
>  but a few line difference:
>  1. Make maxWaitTime large enough when create IterativeStream
> 2. No output back to Itertive Source
> Result:
> The same code is able to checkpoint in 1.9.1
> !Screen Shot 2021-04-16 at 12.43.38 PM.png!
>  
> but always fail on checkpoint in 1.11
> !Screen Shot 2021-04-16 at 12.40.34 PM.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-06-17 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364753#comment-17364753
 ] 

Robert Metzger commented on FLINK-22257:


Besides the {{pipeline.}} options, the restart configuration, and some rocksdb 
config is probably also a ConfigOptionType.JOB.

> Clarify Flink ConfigOptions Usage
> -
>
> Key: FLINK-22257
> URL: https://issues.apache.org/jira/browse/FLINK-22257
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Fabian Paul
>Priority: Minor
>
> For users, it is hard to determine which ConfigOptions are relevant for the 
> different stages of a Flink application.
> Beginning from the translation of the user program to the execution on the 
> cluster. In particular which options can be configured through the different 
> channels.
>  * Cluster configuration (i.e. flink-conf.yaml)
>  * Application configuration, code-based
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22662) YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail

2021-06-17 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364755#comment-17364755
 ] 

Xintong Song commented on FLINK-22662:
--

[~fly_in_gis] and I have looked into this instability. Here are our findings.

The expected behaviors of {{testKillYarnSessionClusterEntrypoint}} are as 
follow:
1) AM_1 running
2) Kill AM
3) Yarn brings up AM_2
4) Signal the job to finish (via temp file)

The problem is that, it is possible that 3) & 4) happen before 2) is finished. 
This is because a Yarn container runs as a process tree, where the Flink java 
process is brought up by a wrapping {{launch_container.sh}} process. Yarn can 
detect the termination of AM_1 and start bring up AM_2 as soon as the wrapping 
process is terminated, while the Flink process might be still running. 
Consequently, the signal from 4) is received by AM_1 and the job finishes 
before AM_1 is completely shutdown. When AM_2 is started, there's no job to be 
recovered, thus the "could not find job" exception.

To fix this, we need to make sure AM_1 is completely terminated before 
proceeding 4). This can be achieved by looking for the PID changes.

Besides, a ZK outage is occasionally observed right after the AM failover. Due 
to absence of ZK logs, we do not find the cause of this outage. However, given 
that the outage is only observed together with the above described problem, we 
tend to see them as related.

> YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail
> 
>
> Key: FLINK-22662
> URL: https://issues.apache.org/jira/browse/FLINK-22662
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.13.0
>Reporter: Guowei Ma
>Assignee: Xintong Song
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> 2021-05-14T00:24:57.8487649Z May 14 00:24:57 [ERROR] 
> testKillYarnSessionClusterEntrypoint(org.apache.flink.yarn.YARNHighAvailabilityITCase)
>   Time elapsed: 34.667 s  <<< ERROR!
> 2021-05-14T00:24:57.8488567Z May 14 00:24:57 
> java.util.concurrent.ExecutionException: 
> 2021-05-14T00:24:57.8489301Z May 14 00:24:57 
> org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.rest.handler.RestHandlerException: 
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (610ed4b159ece04c8ee2ec40e7d0c143)
> 2021-05-14T00:24:57.8493142Z May 14 00:24:57  at 
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:94)
> 2021-05-14T00:24:57.8495823Z May 14 00:24:57  at 
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:84)
> 2021-05-14T00:24:57.8496733Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> 2021-05-14T00:24:57.8497640Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> 2021-05-14T00:24:57.8498491Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-14T00:24:57.8499222Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> 2021-05-14T00:24:57.853Z May 14 00:24:57  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)
> 2021-05-14T00:24:57.8500872Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-05-14T00:24:57.8501702Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-05-14T00:24:57.8502662Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-14T00:24:57.8503472Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> 2021-05-14T00:24:57.8504269Z May 14 00:24:57  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)
> 2021-05-14T00:24:57.8504892Z May 14 00:24:57  at 
> akka.dispatch.OnComplete.internal(Future.scala:263)
> 2021-05-14T00:24:57.8505565Z May 14 00:24:57  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-05-14T00:24:57.8506062Z May 14 00:24:57  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-05-14T00:24:57.8506819Z May 14 00:24:57  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-05-14T00:24:57.8507418Z May 14 00:24:57  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-05-14T00:24:57.8508373Z May 14 00:24:5

[jira] [Commented] (FLINK-22420) UnalignedCheckpointITCase failed

2021-06-17 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364758#comment-17364758
 ] 

Yuan Mei commented on FLINK-22420:
--

The failure of this task is caused by "Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5"

where the additional failure is triggered by 
"An OperatorEvent from an OperatorCoordinator to a task was lost", as reported.

The unaligned checkpoint IT case expected five failures in total:

 * 
 *   After {@code m=1/4*n}, map fails.
 *   After {@code m=1/2*n}, snapshotState fails.
 *   After {@code m=3/4*n}, map fails and the corresponding recovery fails 
once.
 *   At the end, close fails once.
 * 

and it is a bit tricky to introduce potential more failures due to 
OperatorEvent loss;

An easy short-term fix for this is to removes "maxNumberRestartAttempts=5" 
constraints, with a bit of concern that it may hide other problems.





> UnalignedCheckpointITCase failed
> 
>
> Key: FLINK-22420
> URL: https://issues.apache.org/jira/browse/FLINK-22420
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0
>Reporter: Guowei Ma
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17052&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=9442
> {code:java}
> Apr 22 14:28:21   at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> Apr 22 14:28:21   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> Apr 22 14:28:21   at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> Apr 22 14:28:21   at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> Apr 22 14:28:21   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> Apr 22 14:28:21   at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> Apr 22 14:28:21   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> Apr 22 14:28:21   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Apr 22 14:28:21 Caused by: org.apache.flink.util.FlinkException: An 
> OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task 
> failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: 
> Source: source (1/1) - execution #5
> Apr 22 14:28:21   ... 26 more
> Apr 22 14:28:21 
> {code}
> As described in the comment 
> https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17326449
>  we might need to adjust the tests  to allow failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23003) Resource leak in RocksIncrementalSnapshotStrategy

2021-06-17 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364760#comment-17364760
 ] 

Yanfei Lei commented on FLINK-23003:


Sure, opened.  [https://github.com/apache/flink/pull/16176]

[~roman_khachatryan] would you please take a review again?

> Resource leak in RocksIncrementalSnapshotStrategy
> -
>
> Key: FLINK-23003
> URL: https://issues.apache.org/jira/browse/FLINK-23003
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: Flink: 1.14-SNAPSHOT
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> We found that `RocksDBStateUploader` in `RocksIncrementalSnapshotStrategy` is 
> not closed correctly after being used. It would lead to a resource leak.
> `RocksDBStateUploader` inherits `RocksDBStateDataTransfer`, and 
> `RocksDBStateDataTransfer` holds an `ExecutorService`. `RocksDBStateUploader` 
> uses the `ExecutorService` to upload files to DFS asynchronously.
> When `RocksDBKeyedStateBackend` is cleaned up, all resources held by the 
> backend should be closed, but now `RocksIncrementalSnapshotStrategy` lacks a 
> close() function.
> And we encountered an example caused by this problem. When we benchmarked the 
> performance of incremental rescaling, we observed that the forked VM of JMH 
> can't exit normally.
> {code:java}
> [INFO]
> [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ benchmark ---
> # JMH version: 1.19
> # VM version: JDK 1.8.0_281, VM 25.281-b09
> # VM invoker: /home/leiyanfei.lyf/jdk1.8.0_281/jre/bin/java
> # VM options: -Djava.rmi.server.hostname=127.0.0.1 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
> # Warmup: 10 iterations, 1 s each
> # Measurement: 10 iterations, 1 s each
> # Timeout: 10 min per iteration
> # Threads: 1 thread, will synchronize iterations
> # Benchmark mode: Average time, time/op
> # Benchmark: 
> org.apache.flink.state.RocksIncrementalCheckpointScaleUpBenchmark.ScalingUp
> # Parameters: (numberElements = 100, parallelism1 = 2, parallelism2 = 3)# Run 
> progress: 0.00% complete, ETA 00:02:00
> # Fork: 1 of 3
> # Warmup Iteration   1: 244.717 ms/op
> # Warmup Iteration   2: 104.749 ms/op
> # Warmup Iteration   3: 104.182 ms/op
> ...
> Iteration   1: 96.600 ms/op
> Iteration   2: 108.463 ms/op
> Iteration   3: 93.657 ms/op
> ... threads? Waiting 24 seconds more...>Non-finished threads:
> ...
> Thread[pool-15-thread-4,5,main]
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>   at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> The root cause of this example is that the `{{RocksDBStateUploader}}` in 
> `{{RocksIncrementalSnapshotStrategy`}} is not closed normally when 
> `{{RocksDBKeyedStateBackend`}} is disposed.
>  
> The solution to this problem is quite straightforward, 
> `{{RocksDBStateUploader`}} in `{{RocksIncrementalSnapshotStrategy}}` can be 
> closed when cleaning up `{{RocksDBKeyedStateBackend}}`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22420) UnalignedCheckpointITCase failed

2021-06-17 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364758#comment-17364758
 ] 

Yuan Mei edited comment on FLINK-22420 at 6/17/21, 7:52 AM:


The failure of this task is caused by "Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5"

where the additional failure is triggered by 
"An OperatorEvent from an OperatorCoordinator to a task was lost", as reported.

The unaligned checkpoint IT case expected five failures in total:

 * 
 *   After {@code m=1/4*n}, map fails.
 *   After {@code m=1/2*n}, snapshotState fails.
 *   After {@code m=3/4*n}, map fails and the corresponding recovery fails 
once.
 *   At the end, close fails once.
 * 

and it is a bit tricky to introduce potential more failures due to 
OperatorEvent loss; because the success of this test does rely on how many 
failures during it running.

An easy short-term fix for this as I can think of is to removes 
"maxNumberRestartAttempts=5" constraints, with a bit of concern that it may 
hide other problems.






was (Author: ym):
The failure of this task is caused by "Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5"

where the additional failure is triggered by 
"An OperatorEvent from an OperatorCoordinator to a task was lost", as reported.

The unaligned checkpoint IT case expected five failures in total:

 * 
 *   After {@code m=1/4*n}, map fails.
 *   After {@code m=1/2*n}, snapshotState fails.
 *   After {@code m=3/4*n}, map fails and the corresponding recovery fails 
once.
 *   At the end, close fails once.
 * 

and it is a bit tricky to introduce potential more failures due to 
OperatorEvent loss;

An easy short-term fix for this is to removes "maxNumberRestartAttempts=5" 
constraints, with a bit of concern that it may hide other problems.





> UnalignedCheckpointITCase failed
> 
>
> Key: FLINK-22420
> URL: https://issues.apache.org/jira/browse/FLINK-22420
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0
>Reporter: Guowei Ma
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17052&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=9442
> {code:java}
> Apr 22 14:28:21   at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> Apr 22 14:28:21   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> Apr 22 14:28:21   at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> Apr 22 14:28:21   at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> Apr 22 14:28:21   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> Apr 22 14:28:21   at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> Apr 22 14:28:21   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> Apr 22 14:28:21   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Apr 22 14:28:21 Caused by: org.apache.flink.util.FlinkException: An 
> OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task 
> failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: 
> Source: source (1/1) - execution #5
> Apr 22 14:28:21   ... 26 more
> Apr 22 14:28:21 
> {code}
> As described in the comment 
> https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17326449
>  we might need to adjust the tests  to allow failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23013) Introduce faker source connector

2021-06-17 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364761#comment-17364761
 ] 

Konstantin Knauf commented on FLINK-23013:
--

I'd personally prefer to keep it separate, so that the connector can have an 
independent lifecycle. What would be the benefit of having it in the project?

> Introduce faker source connector
> 
>
> Key: FLINK-23013
> URL: https://issues.apache.org/jira/browse/FLINK-23013
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>
> We already have datagen connector.
> But sometimes, we need a more real datagen connector which can produce more 
> natural random records.
> We can integrate [https://github.com/DiUS/java-faker] and introduce a 
> built-in faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-06-17 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364762#comment-17364762
 ] 

Timo Walther commented on FLINK-22257:
--

Maybe but not always. We should think about the layers or teams that usually 
handle different stages of Flink application lifecycle. Maybe something like 
CLUSTER, JOB, PIPELINE. But this is just initial idea, it requires deeper 
investigation.

> Clarify Flink ConfigOptions Usage
> -
>
> Key: FLINK-22257
> URL: https://issues.apache.org/jira/browse/FLINK-22257
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Fabian Paul
>Priority: Minor
>
> For users, it is hard to determine which ConfigOptions are relevant for the 
> different stages of a Flink application.
> Beginning from the translation of the user program to the execution on the 
> cluster. In particular which options can be configured through the different 
> channels.
>  * Cluster configuration (i.e. flink-conf.yaml)
>  * Application configuration, code-based
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-06-17 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364762#comment-17364762
 ] 

Timo Walther edited comment on FLINK-22257 at 6/17/21, 7:55 AM:


Maybe but not always. We should think about the layers or teams that usually 
handle different stages of the Flink application lifecycle. Maybe something 
like CLUSTER, JOB, PIPELINE. But this is just initial idea, it requires deeper 
investigation.


was (Author: twalthr):
Maybe but not always. We should think about the layers or teams that usually 
handle different stages of Flink application lifecycle. Maybe something like 
CLUSTER, JOB, PIPELINE. But this is just initial idea, it requires deeper 
investigation.

> Clarify Flink ConfigOptions Usage
> -
>
> Key: FLINK-22257
> URL: https://issues.apache.org/jira/browse/FLINK-22257
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Fabian Paul
>Priority: Minor
>
> For users, it is hard to determine which ConfigOptions are relevant for the 
> different stages of a Flink application.
> Beginning from the translation of the user program to the execution on the 
> cluster. In particular which options can be configured through the different 
> channels.
>  * Cluster configuration (i.e. flink-conf.yaml)
>  * Application configuration, code-based
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22420) UnalignedCheckpointITCase failed

2021-06-17 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364765#comment-17364765
 ] 

Yuan Mei commented on FLINK-22420:
--

I did see in FLINK-21996 that a second option/solution is mentioned as well, 
which can prevent failover due to RPC loss. 

Given this happens rarely, I'd suggest to just keep it as it as until we have a 
conclusion on FLINK-21996 ?



> UnalignedCheckpointITCase failed
> 
>
> Key: FLINK-22420
> URL: https://issues.apache.org/jira/browse/FLINK-22420
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0
>Reporter: Guowei Ma
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17052&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=9442
> {code:java}
> Apr 22 14:28:21   at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> Apr 22 14:28:21   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> Apr 22 14:28:21   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> Apr 22 14:28:21   at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> Apr 22 14:28:21   at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> Apr 22 14:28:21   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> Apr 22 14:28:21   at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> Apr 22 14:28:21   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> Apr 22 14:28:21   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> Apr 22 14:28:21   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Apr 22 14:28:21 Caused by: org.apache.flink.util.FlinkException: An 
> OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task 
> failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: 
> Source: source (1/1) - execution #5
> Apr 22 14:28:21   ... 26 more
> Apr 22 14:28:21 
> {code}
> As described in the comment 
> https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17326449
>  we might need to adjust the tests  to allow failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-06-17 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364768#comment-17364768
 ] 

Timo Walther commented on FLINK-22257:
--

It might also be useful to look into FLIP-81 and FLIP-73.

> Clarify Flink ConfigOptions Usage
> -
>
> Key: FLINK-22257
> URL: https://issues.apache.org/jira/browse/FLINK-22257
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Fabian Paul
>Priority: Minor
>
> For users, it is hard to determine which ConfigOptions are relevant for the 
> different stages of a Flink application.
> Beginning from the translation of the user program to the execution on the 
> cluster. In particular which options can be configured through the different 
> channels.
>  * Cluster configuration (i.e. flink-conf.yaml)
>  * Application configuration, code-based
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23013) Introduce faker source connector

2021-06-17 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364771#comment-17364771
 ] 

Jingsong Lee commented on FLINK-23013:
--

Putting it under Flink will allow more people to see and use it.

OK, keep it that way until more people need it.

> Introduce faker source connector
> 
>
> Key: FLINK-23013
> URL: https://issues.apache.org/jira/browse/FLINK-23013
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>
> We already have datagen connector.
> But sometimes, we need a more real datagen connector which can produce more 
> natural random records.
> We can integrate [https://github.com/DiUS/java-faker] and introduce a 
> built-in faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality

2021-06-17 Thread Daniel Lenz (Jira)
Daniel Lenz created FLINK-23017:
---

 Summary: HELP in sql-client still shows the removed SOURCE 
functionality
 Key: FLINK-23017
 URL: https://issues.apache.org/jira/browse/FLINK-23017
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.13.1
 Environment: This affects 1.13.0 and 1.13.1
Reporter: Daniel Lenz


The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality

2021-06-17 Thread Daniel Lenz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Lenz updated FLINK-23017:

Description: 
The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.

 

```

{{[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
in illegal context}}

{{```}}

  was:The sql-client still shows the SOURCE command in HELP, even though the 
command itself doesn't exist anymore and using it causes an error.


> HELP in sql-client still shows the removed SOURCE functionality
> ---
>
> Key: FLINK-23017
> URL: https://issues.apache.org/jira/browse/FLINK-23017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
> Environment: This affects 1.13.0 and 1.13.1
>Reporter: Daniel Lenz
>Priority: Minor
>
> The sql-client still shows the SOURCE command in HELP, even though the 
> command itself doesn't exist anymore and using it causes an error.
>  
> ```
> {{[ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
> in illegal context}}
> {{```}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality

2021-06-17 Thread Daniel Lenz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Lenz updated FLINK-23017:

Description: 
The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.

 
{code:java}
Flink SQL> source test.sql;
[ERROR] Could not execute SQL statement. Reason:
 org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
in illegal context
{code}
 

 

  was:
The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.

 

```

{{[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
in illegal context}}

{{```}}


> HELP in sql-client still shows the removed SOURCE functionality
> ---
>
> Key: FLINK-23017
> URL: https://issues.apache.org/jira/browse/FLINK-23017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
> Environment: This affects 1.13.0 and 1.13.1
>Reporter: Daniel Lenz
>Priority: Minor
>
> The sql-client still shows the SOURCE command in HELP, even though the 
> command itself doesn't exist anymore and using it causes an error.
>  
> {code:java}
> Flink SQL> source test.sql;
> [ERROR] Could not execute SQL statement. Reason:
>  org.apache.calcite.runtime.CalciteException: Non-query expression 
> encountered in illegal context
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality

2021-06-17 Thread Daniel Lenz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Lenz updated FLINK-23017:

Description: 
The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.

 
{code:java}
/opt/flink# echo "select 'hello';" > test.sql
/opt/flink# sql-client.sh

Flink SQL> select 'hello';
-- works as intended
[INFO] Result retrieval cancelled.

Flink SQL> source test.sql;
[ERROR] Could not execute SQL statement. Reason: 
org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
in illegal context
{code}
 

 

  was:
The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.

 
{code:java}
Flink SQL> source test.sql;
[ERROR] Could not execute SQL statement. Reason:
 org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
in illegal context
{code}
 

 


> HELP in sql-client still shows the removed SOURCE functionality
> ---
>
> Key: FLINK-23017
> URL: https://issues.apache.org/jira/browse/FLINK-23017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
> Environment: This affects 1.13.0 and 1.13.1
>Reporter: Daniel Lenz
>Priority: Minor
>
> The sql-client still shows the SOURCE command in HELP, even though the 
> command itself doesn't exist anymore and using it causes an error.
>  
> {code:java}
> /opt/flink# echo "select 'hello';" > test.sql
> /opt/flink# sql-client.sh
> Flink SQL> select 'hello';
> -- works as intended
> [INFO] Result retrieval cancelled.
> Flink SQL> source test.sql;
> [ERROR] Could not execute SQL statement. Reason: 
> org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
> in illegal context
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality

2021-06-17 Thread Daniel Lenz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Lenz updated FLINK-23017:

Description: 
The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.

 
{code:java}
/opt/flink# echo "select 'hello';" > test.sql
/opt/flink# sql-client.sh

Flink SQL> select 'hello';
-- works as intended
[INFO] Result retrieval cancelled.

Flink SQL> source test.sql;
[ERROR] Could not execute SQL statement. Reason: 
org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
in illegal context
{code}
 

 I'd be happy to create a PR and remove the relevant lines from the HELP.

  was:
The sql-client still shows the SOURCE command in HELP, even though the command 
itself doesn't exist anymore and using it causes an error.

 
{code:java}
/opt/flink# echo "select 'hello';" > test.sql
/opt/flink# sql-client.sh

Flink SQL> select 'hello';
-- works as intended
[INFO] Result retrieval cancelled.

Flink SQL> source test.sql;
[ERROR] Could not execute SQL statement. Reason: 
org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
in illegal context
{code}
 

 


> HELP in sql-client still shows the removed SOURCE functionality
> ---
>
> Key: FLINK-23017
> URL: https://issues.apache.org/jira/browse/FLINK-23017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
> Environment: This affects 1.13.0 and 1.13.1
>Reporter: Daniel Lenz
>Priority: Minor
>
> The sql-client still shows the SOURCE command in HELP, even though the 
> command itself doesn't exist anymore and using it causes an error.
>  
> {code:java}
> /opt/flink# echo "select 'hello';" > test.sql
> /opt/flink# sql-client.sh
> Flink SQL> select 'hello';
> -- works as intended
> [INFO] Result retrieval cancelled.
> Flink SQL> source test.sql;
> [ERROR] Could not execute SQL statement. Reason: 
> org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
> in illegal context
> {code}
>  
>  I'd be happy to create a PR and remove the relevant lines from the HELP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23018) TTL state factory should handle extended state descriptors

2021-06-17 Thread Yun Tang (Jira)
Yun Tang created FLINK-23018:


 Summary: TTL state factory should handle extended state descriptors
 Key: FLINK-23018
 URL: https://issues.apache.org/jira/browse/FLINK-23018
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.14.0


Currently, {{TtlStateFactory}} can only handle fixed type of state descriptors. 
As {{ValueStateDescriptor}} is not a final class and user could still extend 
it, however, {{TtlStateFactory}} cannot recognize the extending class.

 {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kinds of 
state is.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23018) TTL state factory should handle extended state descriptors

2021-06-17 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23018:
-
Description: 
Currently, {{TtlStateFactory}} can only handle fixed type of state descriptors. 
As {{ValueStateDescriptor}} is not a final class and user could still extend 
it, however, {{TtlStateFactory}} cannot recognize the extending class.

 {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kind of 
state is.

  was:
Currently, {{TtlStateFactory}} can only handle fixed type of state descriptors. 
As {{ValueStateDescriptor}} is not a final class and user could still extend 
it, however, {{TtlStateFactory}} cannot recognize the extending class.

 {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kinds of 
state is.


> TTL state factory should handle extended state descriptors
> --
>
> Key: FLINK-23018
> URL: https://issues.apache.org/jira/browse/FLINK-23018
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.14.0
>
>
> Currently, {{TtlStateFactory}} can only handle fixed type of state 
> descriptors. As {{ValueStateDescriptor}} is not a final class and user could 
> still extend it, however, {{TtlStateFactory}} cannot recognize the extending 
> class.
>  {{TtlStateFactory}} should use {{StateDescriptor#Type}} to check what kind 
> of state is.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23004) Fix misleading log

2021-06-17 Thread Rui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li resolved FLINK-23004.

Fix Version/s: 1.14.0
 Assignee: Junfan Zhang
   Resolution: Fixed

Fixed in master: 9785ea4ee896ae6b11303abc6b62078c37e589cd

> Fix misleading log
> --
>
> Key: FLINK-23004
> URL: https://issues.apache.org/jira/browse/FLINK-23004
> Project: Flink
>  Issue Type: Improvement
>Reporter: Junfan Zhang
>Assignee: Junfan Zhang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23019) Avoid errors when identifiers use reserved keywords

2021-06-17 Thread Timo Walther (Jira)
Timo Walther created FLINK-23019:


 Summary: Avoid errors when identifiers use reserved keywords
 Key: FLINK-23019
 URL: https://issues.apache.org/jira/browse/FLINK-23019
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Timo Walther


We add more and more keywords and built-in functions with special meaning to 
SQL. However, this could be quite annoying for users that have columns named 
like a keyword. E.g. {{timestamp}} or {{current_timestamp}}.

We should investigate if we can do better and avoid forcing escaping with 
backticks. IIRC  Calcite also offers functionalities for that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-06-17 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364798#comment-17364798
 ] 

Wenhao Ji commented on FLINK-22452:
---

Hi, [~mdemierre]. I've just created the FLIP for this. Let us discuss this 
feature in this thread: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-172-Support-custom-transactional-id-prefix-in-FlinkKafkaProducer-td51355.html]

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23017) HELP in sql-client still shows the removed SOURCE functionality

2021-06-17 Thread Daniel Lenz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Lenz updated FLINK-23017:

Environment: (was: This affects 1.13.0 and 1.13.1)

> HELP in sql-client still shows the removed SOURCE functionality
> ---
>
> Key: FLINK-23017
> URL: https://issues.apache.org/jira/browse/FLINK-23017
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.1
>Reporter: Daniel Lenz
>Priority: Minor
>
> The sql-client still shows the SOURCE command in HELP, even though the 
> command itself doesn't exist anymore and using it causes an error.
>  
> {code:java}
> /opt/flink# echo "select 'hello';" > test.sql
> /opt/flink# sql-client.sh
> Flink SQL> select 'hello';
> -- works as intended
> [INFO] Result retrieval cancelled.
> Flink SQL> source test.sql;
> [ERROR] Could not execute SQL statement. Reason: 
> org.apache.calcite.runtime.CalciteException: Non-query expression encountered 
> in illegal context
> {code}
>  
>  I'd be happy to create a PR and remove the relevant lines from the HELP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23013) Introduce faker source connector

2021-06-17 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364806#comment-17364806
 ] 

Konstantin Knauf commented on FLINK-23013:
--

It would be great if we can find a way for more people to discover connectors, 
without the need to have it in the Apache Flink project itself. 

[~AHeise] is looking into ways to do this as far as I know.

> Introduce faker source connector
> 
>
> Key: FLINK-23013
> URL: https://issues.apache.org/jira/browse/FLINK-23013
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>
> We already have datagen connector.
> But sometimes, we need a more real datagen connector which can produce more 
> natural random records.
> We can integrate [https://github.com/DiUS/java-faker] and introduce a 
> built-in faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-17 Thread JING ZHANG (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364807#comment-17364807
 ] 

JING ZHANG commented on FLINK-22955:


Hi, [~gsavl], as we discussed in maillist, the root cause of exception is 
LookupJoin wants to find a method that matches two lookup keys, 'age, id' after 
optimizer push down age=10 into LookupJoin.

There is a possible improvement to avoid throwing an exception: if not found 
matched method with two lookup keys, try to match the method with one lookup 
key. Dimension tables return all records matches 'id=a', then filter output by 
'age=10' later Calc.

But in this way, we need to know whether the only argument in eval method 
represents 'age' or 'id'. The hint is missed in current API. As discussed with 
[~jark] , We think this improvement would involve refactor `LookupTableSource`.

We prefer let the user handle this case by adding a eval method that matches 
lookup keys. for example 
{code:java}
// first solution with one eval method with variable arguments length
@SerialVersionUID(1L)
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {

  @varargs
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer*): 
Unit = {
  }
}

// second solution with multiple eval method, each method with fixed arguments 
length
@SerialVersionUID(1L)
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {

  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): 
Unit = {
  }

  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, 
b: Integer): Unit = {
  }
}

{code}
What do you think, [~gsavl].

 

> lookup join filter push down result to mismatch function signature
> --
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
>Reporter: Cooper Luan
>Priority: Critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>+- Calc(select=[a, b])
>   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>+- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>+- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23020) NullPointerException when running collect from Python API

2021-06-17 Thread Jira
Maciej Bryński created FLINK-23020:
--

 Summary: NullPointerException when running collect from Python API
 Key: FLINK-23020
 URL: https://issues.apache.org/jira/browse/FLINK-23020
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.1
Reporter: Maciej Bryński


Hi, 

I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in 
following scenario.

1. I'm creating datagen table.
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, 
StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.common import Configuration, Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway

conf = Configuration()
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_integer("parallelism.default", 1)

table_env.execute_sql("DROP TABLE IF EXISTS datagen")
table_env.execute_sql("""
CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen'
)
""")
{code}
2. Then I'm running collect
{code:java}
try:
result = table_env.sql_query("select * from datagen limit 1").execute()
for r in result.collect():
print(r)
except KeyboardInterrupt:
result.get_job_client().cancel()
{code}
3. I'm using "interrupt the kernel" button. This is handled by above try/except 
and will cancel the query.

4. I'm running collect from point 2 one more time. Result:
{code:java}
---
Py4JJavaError Traceback (most recent call last)
 in 
  1 try:
> 2 result = table_env.sql_query("select * from datagen limit 
1").execute()
  3 for r in result.collect():
  4 print(r)
  5 except KeyboardInterrupt:

/usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
   1070 """
   1071 self._t_env._before_execute()
-> 1072 return TableResult(self._j_table.execute())
   1073 
   1074 def explain(self, *extra_details: ExplainDetail) -> str:

/usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, 
*args)
   1283 
   1284 answer = self.gateway_client.send_command(command)
-> 1285 return_value = get_return_value(
   1286 answer, self.gateway_client, self.target_id, self.name)
   1287 

/usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, 
**kw)
144 def deco(*a, **kw):
145 try:
--> 146 return f(*a, **kw)
147 except Py4JJavaError as e:
148 from pyflink.java_gateway import get_gateway

/usr/local/lib/python3.8/dist-packages/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o69.execute.
: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
at 
org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
at 
org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
at 
org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858)
at 
org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
 

[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API

2021-06-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciej Bryński updated FLINK-23020:
---
Summary: NullPointerException when running collect twice from Python API  
(was: NullPointerException when running collect from Python API)

> NullPointerException when running collect twice from Python API
> ---
>
> Key: FLINK-23020
> URL: https://issues.apache.org/jira/browse/FLINK-23020
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.1
>Reporter: Maciej Bryński
>Priority: Major
>
> Hi, 
> I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in 
> following scenario.
> 1. I'm creating datagen table.
> {code:java}
> from pyflink.table import EnvironmentSettings, TableEnvironment, 
> StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.common import Configuration, Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> conf = Configuration()
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> table_env.get_config().get_configuration().set_integer("parallelism.default", 
> 1)
> table_env.execute_sql("DROP TABLE IF EXISTS datagen")
> table_env.execute_sql("""
> CREATE TABLE datagen (
> id INT
> ) WITH (
> 'connector' = 'datagen'
> )
> """)
> {code}
> 2. Then I'm running collect
> {code:java}
> try:
> result = table_env.sql_query("select * from datagen limit 1").execute()
> for r in result.collect():
> print(r)
> except KeyboardInterrupt:
> result.get_job_client().cancel()
> {code}
> 3. I'm using "interrupt the kernel" button. This is handled by above 
> try/except and will cancel the query.
> 4. I'm running collect from point 2 one more time. Result:
> {code:java}
> ---
> Py4JJavaError Traceback (most recent call last)
>  in 
>   1 try:
> > 2 result = table_env.sql_query("select * from datagen limit 
> 1").execute()
>   3 for r in result.collect():
>   4 print(r)
>   5 except KeyboardInterrupt:
> /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
>1070 """
>1071 self._t_env._before_execute()
> -> 1072 return TableResult(self._j_table.execute())
>1073 
>1074 def explain(self, *extra_details: ExplainDetail) -> str:
> /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, 
> *args)
>1283 
>1284 answer = self.gateway_client.send_command(command)
> -> 1285 return_value = get_return_value(
>1286 answer, self.gateway_client, self.target_id, self.name)
>1287 
> /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, 
> **kw)
> 144 def deco(*a, **kw):
> 145 try:
> --> 146 return f(*a, **kw)
> 147 except Py4JJavaError as e:
> 148 from pyflink.java_gateway import get_gateway
> /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
> 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 325 if answer[1] == REFERENCE_TYPE:
> --> 326 raise Py4JJavaError(
> 327 "An error occurred while calling {0}{1}{2}.\n".
> 328 format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o69.execute.
> : java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
>   at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
>   at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>   at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
>   at 
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>   at 
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>   at 
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>   at 
> org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
>   at 
> org.apache.c

[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API

2021-06-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciej Bryński updated FLINK-23020:
---
Description: 
Hi, 

I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in 
following scenario.

1. I'm creating datagen table.
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, 
StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.common import Configuration, Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway

conf = Configuration()
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_integer("parallelism.default", 1)

table_env.execute_sql("DROP TABLE IF EXISTS datagen")
table_env.execute_sql("""
CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen'
)
""")
{code}
2. Then I'm running collect
{code:java}
try:
result = table_env.sql_query("select * from datagen limit 1").execute()
for r in result.collect():
print(r)
except KeyboardInterrupt:
result.get_job_client().cancel()
{code}
3. I'm using "interrupt the kernel" button. This is handled by above try/except 
and will cancel the query.

4. I'm running collect from point 2 one more time. Result:
{code:java}
---
Py4JJavaError Traceback (most recent call last)
 in 
  1 try:
> 2 result = table_env.sql_query("select * from datagen limit 
1").execute()
  3 for r in result.collect():
  4 print(r)
  5 except KeyboardInterrupt:

/usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
   1070 """
   1071 self._t_env._before_execute()
-> 1072 return TableResult(self._j_table.execute())
   1073 
   1074 def explain(self, *extra_details: ExplainDetail) -> str:

/usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, 
*args)
   1283 
   1284 answer = self.gateway_client.send_command(command)
-> 1285 return_value = get_return_value(
   1286 answer, self.gateway_client, self.target_id, self.name)
   1287 

/usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, 
**kw)
144 def deco(*a, **kw):
145 try:
--> 146 return f(*a, **kw)
147 except Py4JJavaError as e:
148 from pyflink.java_gateway import get_gateway

/usr/local/lib/python3.8/dist-packages/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o69.execute.
: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
at 
org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
at 
org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
at 
org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858)
at 
org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.tabl

[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API

2021-06-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciej Bryński updated FLINK-23020:
---
Description: 
Hi, 

I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in 
following scenario.

1. I'm creating datagen table.
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, 
StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.common import Configuration, Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway

conf = Configuration()
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_integer("parallelism.default", 1)

table_env.execute_sql("DROP TABLE IF EXISTS datagen")
table_env.execute_sql("""
CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen'
)
""")
{code}
2. Then I'm running collect
{code:java}
try:
result = table_env.sql_query("select * from datagen limit 1").execute()
for r in result.collect():
print(r)
except KeyboardInterrupt:
result.get_job_client().cancel()
{code}
3. I'm using "interrupt the kernel" button. This is handled by above try/except 
and will cancel the query.

4. I'm running collect from point 2 one more time. Result:
{code:java}
---
Py4JJavaError Traceback (most recent call last)
 in 
  1 try:
> 2 result = table_env.sql_query("select * from datagen limit 
1").execute()
  3 for r in result.collect():
  4 print(r)
  5 except KeyboardInterrupt:

/usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
   1070 """
   1071 self._t_env._before_execute()
-> 1072 return TableResult(self._j_table.execute())
   1073 
   1074 def explain(self, *extra_details: ExplainDetail) -> str:

/usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, 
*args)
   1283 
   1284 answer = self.gateway_client.send_command(command)
-> 1285 return_value = get_return_value(
   1286 answer, self.gateway_client, self.target_id, self.name)
   1287 

/usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, 
**kw)
144 def deco(*a, **kw):
145 try:
--> 146 return f(*a, **kw)
147 except Py4JJavaError as e:
148 from pyflink.java_gateway import get_gateway

/usr/local/lib/python3.8/dist-packages/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o69.execute.
: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
at 
org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
at 
org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
at 
org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858)
at 
org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.tabl

[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API

2021-06-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciej Bryński updated FLINK-23020:
---
Description: 
Hi, 

I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in 
following scenario.

1. I'm creating datagen table.
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, 
StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.common import Configuration, Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway

conf = Configuration()
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_integer("parallelism.default", 1)

table_env.execute_sql("DROP TABLE IF EXISTS datagen")
table_env.execute_sql("""
CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen'
)
""")
{code}
2. Then I'm running collect
{code:java}
try:
result = table_env.sql_query("select * from datagen limit 1").execute()
for r in result.collect():
print(r)
except KeyboardInterrupt:
result.get_job_client().cancel()
{code}
3. I'm using "interrupt the kernel" button. This is handled by above try/except 
and will cancel the query.

4. I'm running collect from point 2 one more time. Result:
{code:java}
---
Py4JJavaError Traceback (most recent call last)
 in 
  1 try:
> 2 result = table_env.sql_query("select * from datagen limit 
1").execute()
  3 for r in result.collect():
  4 print(r)
  5 except KeyboardInterrupt:

/usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
   1070 """
   1071 self._t_env._before_execute()
-> 1072 return TableResult(self._j_table.execute())
   1073 
   1074 def explain(self, *extra_details: ExplainDetail) -> str:

/usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, 
*args)
   1283 
   1284 answer = self.gateway_client.send_command(command)
-> 1285 return_value = get_return_value(
   1286 answer, self.gateway_client, self.target_id, self.name)
   1287 

/usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, 
**kw)
144 def deco(*a, **kw):
145 try:
--> 146 return f(*a, **kw)
147 except Py4JJavaError as e:
148 from pyflink.java_gateway import get_gateway

/usr/local/lib/python3.8/dist-packages/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o69.execute.
: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
at 
org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
at 
org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
at 
org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858)
at 
org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.tabl

[jira] [Commented] (FLINK-23013) Introduce faker source connector

2021-06-17 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364811#comment-17364811
 ] 

Jingsong Lee commented on FLINK-23013:
--

(y) Yes, we need a website or something else.

> Introduce faker source connector
> 
>
> Key: FLINK-23013
> URL: https://issues.apache.org/jira/browse/FLINK-23013
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>
> We already have datagen connector.
> But sometimes, we need a more real datagen connector which can produce more 
> natural random records.
> We can integrate [https://github.com/DiUS/java-faker] and introduce a 
> built-in faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22974) No execution checkpointing config desc in flink-conf.yaml

2021-06-17 Thread tinawenqiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364156#comment-17364156
 ] 

tinawenqiao edited comment on FLINK-22974 at 6/17/21, 9:51 AM:
---

[~jark],PR has submitted here 
[https://github.com/apache/flink/pull/16183|http://example.com]


was (Author: wenqiao):
[~jark],PR has submitted here 
[https://github.com/apache/flink/pull/16147|http://example.com]

> No execution checkpointing config desc in flink-conf.yaml 
> --
>
> Key: FLINK-22974
> URL: https://issues.apache.org/jira/browse/FLINK-22974
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.13.1
>Reporter: tinawenqiao
>Assignee: tinawenqiao
>Priority: Minor
>
> We found that there is no parameter description for execution checkpointing 
> in flink-conf.yaml. It may cause a misunderstanding.  I think we can add some 
> important parameters, such as 
> execution.checkpointing.interval,execution.checkpointing.externalized-checkpoint-retention,execution.checkpointing.tolerable-failed-checkpoints
>  etc.
>  
> #==
>  # Fault tolerance and checkpointing
>  
> #==
>  # The backend that will be used to store operator state checkpoints if
>  # checkpointing is enabled. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23021) Check if the restarted job topology is consistent on restart

2021-06-17 Thread Yun Gao (Jira)
Yun Gao created FLINK-23021:
---

 Summary: Check if the restarted job topology is consistent on 
restart
 Key: FLINK-23021
 URL: https://issues.apache.org/jira/browse/FLINK-23021
 Project: Flink
  Issue Type: Sub-task
Reporter: Yun Gao


Users might modify the job topology before restart for external checkpoint and 
savepoint. To overcome this issue, we would need to check if a fully finished 
operator have been added after a non-fully-finished operator. If so, we would 
throw exception to disallow this situation or re-mark the fully finished 
operator as alive. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22998) Flink SQL does not support block comment before SET command

2021-06-17 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-22998:

Component/s: (was: Table SQL / Planner)
 Table SQL / API

> Flink SQL does not support block comment before SET command
> ---
>
> Key: FLINK-22998
> URL: https://issues.apache.org/jira/browse/FLINK-22998
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Zhiwen Sun
>Priority: Major
>
> Flink SQL does not support block comment before SET command.
> A tiny SQL that produces the bug:
>  
> {code:java}
> /**
>  comment
> **/
> SET sql-client.execution.result-mode=TABLEAU;
> SELECT 'hello';{code}
>  
> while following SQL works fine:
>  
> {code:java}
> SET sql-client.execution.result-mode=TABLEAU;
> /**
>  comment
> **/
> SELECT 'hello';{code}
>  
>  
> After I debug Flink source code, I found that EXTENDED_PARSER does not 
> support block comment.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23013) Introduce faker source connector

2021-06-17 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364815#comment-17364815
 ] 

Jark Wu commented on FLINK-23013:
-

Isn't it https://flink-packages.org/ ? 

> Introduce faker source connector
> 
>
> Key: FLINK-23013
> URL: https://issues.apache.org/jira/browse/FLINK-23013
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>
> We already have datagen connector.
> But sometimes, we need a more real datagen connector which can produce more 
> natural random records.
> We can integrate [https://github.com/DiUS/java-faker] and introduce a 
> built-in faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22703) SavepointITCase.testTriggerSavepointAndResumeWithFileBasedCheckpoints failed

2021-06-17 Thread Anton Kalashnikov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Kalashnikov closed FLINK-22703.
-
Resolution: Duplicate

This test use the same codebase that will be fixed in 
https://issues.apache.org/jira/browse/FLINK-22593

> SavepointITCase.testTriggerSavepointAndResumeWithFileBasedCheckpoints failed
> 
>
> Key: FLINK-22703
> URL: https://issues.apache.org/jira/browse/FLINK-22703
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18106&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=c2734c79-73b6-521c-e85a-67c7ecae9107&l=9765
> {code:java}
> May 19 00:51:42 Caused by: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Map (3/4) of job 1785ead58e598bc669c92d5a2cd14618 has not 
> being executed at the moment. Aborting checkpoint. Failure reason: Not all 
> required tasks are currently running.
> May 19 00:51:42   at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
> May 19 00:51:42   at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
> May 19 00:51:42   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> May 19 00:51:42   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> May 19 00:51:42   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> May 19 00:51:42   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> May 19 00:51:42   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> May 19 00:51:42   at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> May 19 00:51:42   at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> May 19 00:51:42   at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> May 19 00:51:42   at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> May 19 00:51:42   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> May 19 00:51:42   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> May 19 00:51:42   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> May 19 00:51:42   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> May 19 00:51:42   at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> May 19 00:51:42   at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> May 19 00:51:42   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> May 19 00:51:42   at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> May 19 00:51:42   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> May 19 00:51:42   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> May 19 00:51:42   at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> May 19 00:51:42   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> May 19 00:51:42   at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> May 19 00:51:42   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22625) FileSinkMigrationITCase unstable

2021-06-17 Thread Anton Kalashnikov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364826#comment-17364826
 ] 

Anton Kalashnikov commented on FLINK-22625:
---

[~gaoyunhaii], I think I found the problem and I will fix along with the same 
problem in another test. Please, check this PR 
[https://github.com/apache/flink/pull/16151/files#diff-5df9eea57ea9542415f7e58a7c7051a1c6156f42423ba69f4323e697bb2c9cb4,]
 and if you agree that it will resolve the problem can you close this ticket as 
duplicate of https://issues.apache.org/jira/browse/FLINK-22593

> FileSinkMigrationITCase unstable
> 
>
> Key: FLINK-22625
> URL: https://issues.apache.org/jira/browse/FLINK-22625
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.14.0
>Reporter: Dawid Wysakowicz
>Assignee: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17817&view=logs&j=5cae8624-c7eb-5c51-92d3-4d2dacedd221&t=420bd9ec-164e-562e-8947-0dacde3cec91&l=22179
> {code}
> May 11 00:43:40 Caused by: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Sink: Unnamed (1/3) of job 733a4777cca170f86724832642e2a8b1 
> has not being executed at the moment. Aborting checkpoint. Failure reason: 
> Not all required tasks are currently running.
> May 11 00:43:40   at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
> May 11 00:43:40   at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
> May 11 00:43:40   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> May 11 00:43:40   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> May 11 00:43:40   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> May 11 00:43:40   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> May 11 00:43:40   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> May 11 00:43:40   at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> May 11 00:43:40   at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> May 11 00:43:40   at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> May 11 00:43:40   at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> May 11 00:43:40   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> May 11 00:43:40   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> May 11 00:43:40   at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> May 11 00:43:40   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> May 11 00:43:40   at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> May 11 00:43:40   at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> May 11 00:43:40   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> May 11 00:43:40   at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> May 11 00:43:40   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> May 11 00:43:40   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> May 11 00:43:40   at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> May 11 00:43:40   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> May 11 00:43:40   at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> May 11 00:43:40   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11982) BatchTableSourceFactory support Json Format File

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11982:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> BatchTableSourceFactory support Json Format File
> 
>
> Key: FLINK-11982
> URL: https://issues.apache.org/jira/browse/FLINK-11982
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: pingle wang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> java code :
> {code:java}
> val connector = FileSystem().path("data/in/test.json")
> val desc = tEnv.connect(connector)
> .withFormat(
>   new Json().schema( 
>     Types.ROW(Array[String]("id", "name", "age"), 
>   Array[TypeInformation[_]](Types.STRING, 
> Types.STRING, Types.INT)) 
>     ) 
>     .failOnMissingField(true) 
> )
> .registerTableSource("persion")
> val sql = "select * from person"
> val result = tEnv.sqlQuery(sql)
> {code}
> Exception info :
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
> suitable table factory for 
> 'org.apache.flink.table.factories.BatchTableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.path=file:///Users/batch/test.json
> connector.property-version=1
> connector.type=filesystem
> format.derive-schema=true
> format.fail-on-missing-field=true
> format.property-version=1
> format.type=json
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.formats.avro.AvroRowFormatFactory
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
> at 
> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
> at 
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
> at 
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
> at 
> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:44)
> at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
> at com.meitu.mlink.sql.batch.JsonExample.main(JsonExample.java:36){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11968) Fix runtime SingleElementIterator.iterator and remove table.SingleElementIterator

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11968:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Fix runtime SingleElementIterator.iterator and remove 
> table.SingleElementIterator
> -
>
> Key: FLINK-11968
> URL: https://issues.apache.org/jira/browse/FLINK-11968
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: Jingsong Lee
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code:java}
> @Override
> public Iterator iterator() {
>return this;
> }
> {code}
> In iterator we need set available to true otherwise we can only iterator once.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12003) Revert the config option about mapreduce.output.basename in HadoopOutputFormatBase

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12003:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Revert the config option about mapreduce.output.basename in 
> HadoopOutputFormatBase
> --
>
> Key: FLINK-12003
> URL: https://issues.apache.org/jira/browse/FLINK-12003
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hadoop Compatibility
>Reporter: vinoyang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> In {{HadoopOutputFormatBase}} open method, the config option 
> {{mapreduce.output.basename}} was changed to "tmp" and there is not any 
> documentation state this change.
> By default, HDFS will use this format "part-x-y" to name its file, the x 
> and y means : 
>  * {{x}} is either 'm' or 'r', depending on whether the job was a map only 
> job, or reduce
>  * {{y}} is the mapper or reducer task number (zero based)
>  
> The keyword "part" has used in many place in user's business logic to match 
> the hdfs's file name. So I suggest to revert this config option or document 
> it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11848:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> 
>
> Key: FLINK-11848
> URL: https://issues.apache.org/jira/browse/FLINK-11848
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4
>Reporter: Shengnan YU
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Recently we are doing some streaming jobs with apache flink. There are 
> multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex 
> pattern to let a consumer to consume those topics. However, if we delete some 
> older topics, it seems that the metadata in consumer does not update properly 
> so It still remember those outdated topic in its topic list, which leads to 
> *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It 
> seems to occur in producer as well. Any idea to solve this problem? Thank you 
> very much!
>  
> Example to reproduce problem:
> There are multiple kafka topics which are 
> "test20190310","test20190311","test20190312" for instance. I run the job and 
> everything is ok. Then if I delete topic "test20190310", the consumer does 
> not perceive the topic is deleted, it will still go fetching metadata of that 
> topic. In taskmanager's log, unknown errors display. 
> {code:java}
> public static void main(String []args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.rest", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> 
> props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>"120");
> Pattern topics = Pattern.compile("^test.*$");
> FlinkKafkaConsumer011 consumer = new 
> FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props);
> DataStream stream = env.addSource(consumer);
> stream.writeToSocket("localhost", 4, new SimpleStringSchema());
> env.execute("test");
> }
> }
> {code}
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11970) TableEnvironment#registerFunction should overwrite if the same name function existed.

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11970:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> TableEnvironment#registerFunction should overwrite if the same name function 
> existed. 
> --
>
> Key: FLINK-11970
> URL: https://issues.apache.org/jira/browse/FLINK-11970
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Jeff Zhang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently, it would throw exception if I try to register user function 
> multiple times. Registering udf multiple times is very common usage in 
> notebook scenario. And I don't think it would cause issues for users.
> e.g. If user happened to register the same udf multiple times (he intend to 
> register 2 different udf actually), then he would get exception at runtime 
> where he use the udf that is missing registration. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11934) Remove the deprecate methods in TableEnvironment

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11934:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Remove the deprecate methods in TableEnvironment
> 
>
> Key: FLINK-11934
> URL: https://issues.apache.org/jira/browse/FLINK-11934
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> There are several {{getTableEnvironment()}} methods which are deprecated 
> during 1.8 in {{TableEnvironment}}. As the release-1.8 has been cut off. We 
> can remove these deprecate methods now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11909:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Provide default failure/timeout/backoff handling strategy for AsyncIO 
> functions
> ---
>
> Key: FLINK-11909
> URL: https://issues.apache.org/jira/browse/FLINK-11909
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Rong Rong
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently Flink AsyncIO by default fails the entire job when async function 
> invoke fails [1]. It would be nice to have some default Async IO 
> failure/timeout handling strategy, or opens up some APIs for AsyncFunction 
> timeout method to interact with the AsyncWaitOperator. For example (quote 
> [~suez1224] in [2]):
> * FAIL_OPERATOR (default & current behavior)
> * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
> * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
> Discussion also extended to introduce configuration such as: 
> * MAX_RETRY_COUNT
> * RETRY_FAILURE_POLICY
> REF:
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
> [2] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11936) Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue.

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11936:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter 
> issue.
> -
>
> Key: FLINK-11936
> URL: https://issues.apache.org/jira/browse/FLINK-11936
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Rong Rong
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> AuxiliaryConverter was pulled in FLINK-6409. Since CALCITE-1761 has been 
> fixed, we should sync back with the calcite version.
> After a quick glance, I think it is not so simple to just delete the class so 
> I opened a follow up Jira on this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11893) Update ecosystem page and add it to the navigation bar

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11893:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Update ecosystem page and add it to the navigation bar
> --
>
> Key: FLINK-11893
> URL: https://issues.apache.org/jira/browse/FLINK-11893
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Jiangjie Qin
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Based on the discussion in the mailing list, we would like to encourage the 
> connector authors to share their connectors with the community, even the 
> connectors are not in the Flink repo.
> This ticket updates the ecosystem page on the website to reflect the change. 
> It also adds ecosystem page back to the navigation bar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11747) Elasticsearch 6 connector - Expose RestHighLevelClient to allow for custom sniffing

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11747:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Elasticsearch 6 connector - Expose RestHighLevelClient to allow for custom 
> sniffing
> ---
>
> Key: FLINK-11747
> URL: https://issues.apache.org/jira/browse/FLINK-11747
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: Samir Desai
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In the Elasticsearch6 connector, the 
> [RestClientFactory|https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java#L31]
>  allows users to customize the {{RestClientBuilder}}. However, certain 
> customizations like adding 
> [Sniffing|https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/_usage.html]
>  
> require access to the low-level rest client, which can be obtained through 
> the high level rest client. The {{RestHighLevelClient}} is 
> [instantiated|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java#L75]
>  in the api call bridge, and is never exposed to the user.
> To my knowledge, the current Elasticsearch6 connector does not utilize 
> sniffing or provide a way to add it in. The Elasticsearch6 connector should 
> expose some type of access to the RestHighLevelClient to allow for custom 
> sniffing. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11737) Support org.apache.hadoop.mapreduce.lib.output.MultipleOutputs output

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11737:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Support org.apache.hadoop.mapreduce.lib.output.MultipleOutputs output
> -
>
> Key: FLINK-11737
> URL: https://issues.apache.org/jira/browse/FLINK-11737
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hadoop Compatibility
>Reporter: vinoyang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> This issue is to improve Flink's compatibility with Hadoop. Currently, for 
> the old version of the Hadoop API, there is 
> {{org.apache.hadoop.mapred.lib.MultipleOutputFormat}}, which can be used 
> directly. However, for the new version of the Hadoop API 
> {{org.apache.hadoop.mapreduce.lib.output.MultipleOutputs}}, the current Flink 
> cannot be supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11818) Provide pipe transformation function for DataSet API

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11818:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Provide pipe transformation function for DataSet API
> 
>
> Key: FLINK-11818
> URL: https://issues.apache.org/jira/browse/FLINK-11818
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Reporter: vinoyang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We have some business requirements that require the data handled by Flink to 
> interact with some external programs (such as Python/Perl/shell scripts). 
> There is no such function in the existing DataSet API, although it can be 
> implemented by the map function, but it is not concise. It would be helpful 
> if we could provide a pipe[1] function like Spark.
> [1]: 
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4004) Do not pass custom flink kafka connector properties to Kafka to avoid warnings

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4004:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Do not pass custom flink kafka connector properties to Kafka to avoid warnings
> --
>
> Key: FLINK-4004
> URL: https://issues.apache.org/jira/browse/FLINK-4004
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Robert Metzger
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> The FlinkKafkaConsumer has some custom properties, which we pass to the 
> KafkaConsumer as well (such as {{flink.poll-timeout}}). This causes Kafka to 
> log warnings about unused properties.
> We should not pass Flink-internal properties to Kafka, to avoid those 
> warnings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4088) Add interface to save and load TableSources

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4088:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add interface to save and load TableSources
> ---
>
> Key: FLINK-4088
> URL: https://issues.apache.org/jira/browse/FLINK-4088
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> Add an interface to save and load table sources similar to Java's 
> {{Serializable}} interface. 
> TableSources should implement the interface to become saveable and loadable.
> This could be used as follows:
> {code}
> val cts = new CsvTableSource(
>   "/path/to/csvfile",
>   Array("name", "age", "address"),
>   Array(BasicTypeInfo.STRING_TYPEINFO, ...),
>   ...
> )
> cts.saveToFile("/path/to/tablesource/file")
> // -
> val tEnv: TableEnvironment = ???
> tEnv.loadTableSource("persons", "/path/to/tablesource/file")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4043) Generalize RabbitMQ connector into AMQP connector

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4043:
--
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Generalize RabbitMQ connector into AMQP connector
> -
>
> Key: FLINK-4043
> URL: https://issues.apache.org/jira/browse/FLINK-4043
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Robert Metzger
>Priority: Major
>  Labels: stale-major
>
> Our current RabbitMQ connector is actually using a AMQP client implemented by 
> RabbitMQ.
> AMQP is a protocol for message queues, implemented by different clients and 
> brokers.
> I'm suggesting to rename the connector so that its more obvious to users of 
> other brokers that they can use the connector as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3133:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Introduce collect()/count()/print() methods in DataStream API
> -
>
> Key: FLINK-3133
> URL: https://issues.apache.org/jira/browse/FLINK-3133
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 0.10.0, 0.10.1, 1.0.0
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should 
> be mirrored to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts 
> of a stream, e.g. by supplying a time period in the arguments to the methods. 
> Users should use the {{JobClient}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = 
> StramEnvironment.getStreamExecutionEnvironment();
> DataStream streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable sampled = jobClient.sampleStream(streamData, 
> Time.seconds(5));
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-2186) Rework CSV import to support very wide files

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-2186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-2186:
--
Labels: auto-unassigned pull-request-available stale-major  (was: 
auto-unassigned pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Rework CSV import to support very wide files
> 
>
> Key: FLINK-2186
> URL: https://issues.apache.org/jira/browse/FLINK-2186
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Scala, Library / Machine Learning
>Reporter: Theodore Vasiloudis
>Priority: Major
>  Labels: auto-unassigned, pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the current readVcsFile implementation, importing CSV files with many 
> columns can become from cumbersome to impossible.
> For example to import an 11 column file we need to write:
> {code}
> val cancer = env.readCsvFile[(String, String, String, String, String, String, 
> String, String, String, String, 
> String)]("/path/to/breast-cancer-wisconsin.data")
> {code}
> For many use cases in Machine Learning we might have CSV files with thousands 
> or millions of columns that we want to import as vectors.
> In that case using the current readCsvFile method becomes impossible.
> We therefore need to rework the current function, or create a new one that 
> will allow us to import CSV files with an arbitrary number of columns.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3394:
--
Labels: auto-deprioritized-critical stale-major  (was: 
auto-deprioritized-critical)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Clear up the contract of MutableObjectIterator.next(reuse)
> --
>
> Key: FLINK-3394
> URL: https://issues.apache.org/jira/browse/FLINK-3394
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.0.0
>Reporter: Gábor Gévay
>Priority: Major
>  Labels: auto-deprioritized-critical, stale-major
>
> {{MutableObjectIterator.next(reuse)}} has the following contract (according 
> to [~StephanEwen]'s comment \[1\]):
> 1. The caller may not hold onto {{reuse}} any more
> 2. The iterator implementor may not hold onto the returned object any more.
> This should be documented in its javadoc (with "WARNING" so that people don't 
> overlook it).
> Additionally, since this was a "secret contract" up to now, all the 270 
> usages of {{MutableObjectIterator.next(reuse)}} should be checked for 
> violations. A few that are suspicious at first glance, are in 
> {{CrossDriver}}, {{UnionWithTempOperator}}, 
> {{MutableHashTable.ProbeIterator.next}}, 
> {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. (The violating calls 
> in the reduce drivers are being fixed by 
> https://github.com/apache/flink/pull/1626 )
> \[1\] 
> https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3850:
--
Labels: auto-unassigned pull-request-available stale-major  (was: 
auto-unassigned pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: auto-unassigned, pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3997) PRNG Skip-ahead

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3997:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> PRNG Skip-ahead
> ---
>
> Key: FLINK-3997
> URL: https://issues.apache.org/jira/browse/FLINK-3997
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Graph Processing (Gelly)
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> The current sources of randomness for Gelly Graph Generators use fixed-size 
> blocks of work which include an initial seed. There are two issues with this 
> approach. First, the size of the collection of blocks can exceed the Akka 
> limit and cause the job to silently fail. Second, as the block seeds are 
> randomly chosen, the likelihood of blocks overlapping and producing the same 
> sequence increases with the size of the graph.
> The random generators will be reimplemented using {{SplittableIterator}} and 
> PRNGs supporting skip-ahead.
> This ticket will implement skip-ahead with LCGs [0]. Future work may add 
> support for xorshift generators ([1], section 5 "Jumping Ahead").
> [0] 
> https://mit-crpg.github.io/openmc/methods/random_numbers.html#skip-ahead-capability
> [1] https://arxiv.org/pdf/1404.0390.pdf



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3473:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add partial aggregate support in Flink
> --
>
> Key: FLINK-3473
> URL: https://issues.apache.org/jira/browse/FLINK-3473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Reporter: Chengxiang Li
>Priority: Major
>  Labels: auto-unassigned, stale-major
> Attachments: PartialAggregateinFlink_v1.pdf, 
> PartialAggregateinFlink_v2.pdf
>
>
> For decomposable aggregate function, partial aggregate is more efficient as 
> it significantly reduce the network traffic during shuffle and parallelize 
> part of the aggregate calculation. This is an umbrella task for partial 
> aggregate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3857:
--
Labels: auto-unassigned pull-request-available stale-major  (was: 
auto-unassigned pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: auto-unassigned, pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3957) Breaking changes for Flink 2.0

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3957:
--
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Breaking changes for Flink 2.0
> --
>
> Key: FLINK-3957
> URL: https://issues.apache.org/jira/browse/FLINK-3957
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataSet, API / DataStream, Build System
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Priority: Major
>  Labels: stale-major
> Fix For: 2.0.0
>
>
> From time to time, we find APIs in Flink (1.x.y) marked as stable, even 
> though we would like to change them at some point.
> This JIRA is to track all planned breaking API changes.
> I would suggest to add subtasks to this one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3462) Add position and magic number checks to record (de)serializers

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3462:
--
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add position and magic number checks to record (de)serializers
> --
>
> Key: FLINK-3462
> URL: https://issues.apache.org/jira/browse/FLINK-3462
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Ufuk Celebi
>Priority: Major
>  Labels: stale-major
>
> In order to improve the debugging experience in case of serialization errors, 
> we can add further sanity checks to the deserializers:
> - Check expected position in buffer in order to ensure that record boundaries 
> are not crossed within a buffer or bytes left unconsumed.
> - Add magic number to record serializer and deserializer in order to check 
> byte corruption. We currently only have this on a per buffer level.
> We will be able to use these checks to improve the error messages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-2396) Review the datasets of dynamic path and static path in iteration.

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-2396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-2396:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Review the datasets of dynamic path and static path in iteration.
> -
>
> Key: FLINK-2396
> URL: https://issues.apache.org/jira/browse/FLINK-2396
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Chengxiang Li
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> Currently Flink would cached dataset in static path as it assumes that 
> dataset stay the same during the iteration, but this assumption does not 
> always be true. Take sampling for example, the iteration data set is 
> something like the weight vector of model and there is another training 
> dataset from which to take a small sample to update the weight vector in each 
> iteration (e.g. Stochastic Gradient Descent), we expect sampled dataset is 
> different in each iteration, but Flink would cache the sampled dataset as it 
> in static path. 
> We should review how Flink identify dynamic path and static path, and support 
> add sampled dataset in above example to dynamic path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-2055) Implement Streaming HBaseSink

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-2055:
--
Labels: auto-unassigned pull-request-available stale-major  (was: 
auto-unassigned pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Priority: Major
>  Labels: auto-unassigned, pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-951) Reworking of Iteration Synchronization, Accumulators and Aggregators

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-951:
-
Labels: auto-unassigned refactoring stale-major  (was: auto-unassigned 
refactoring)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Reworking of Iteration Synchronization, Accumulators and Aggregators
> 
>
> Key: FLINK-951
> URL: https://issues.apache.org/jira/browse/FLINK-951
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet, Runtime / Task
>Affects Versions: 0.9
>Reporter: Markus Holzemer
>Priority: Major
>  Labels: auto-unassigned, refactoring, stale-major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I just realized that there is no real Jira issue for the task I am currently 
> working on. 
> I am currently reworking a few things regarding Iteration Synchronization, 
> Accumulators and Aggregators. Currently the synchronization at the end of one 
> superstep is done through channel events. That makes it hard to track the 
> current status of iterations. That is why I am changing this synchronization 
> to use RPC calls with the JobManager, so that the JobManager manages the 
> current status of all iterations.
> Currently we use Accumulators outside of iterations and Aggregators inside of 
> iterations. Both have a similiar function, but a bit different interfaces and 
> handling. I want to unify these two concepts. I propose that we stick in the 
> future to Accumulators only. Aggregators therefore are removed and 
> Accumulators are extended to cover the usecases Aggregators were used fore 
> before. The switch to RPC for iterations makes it possible to also send the 
> current Accumulator values at the end of each superstep, so that the 
> JobManager (and thereby the webinterface) will be able to print intermediate 
> accumulation results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11912:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Expose per partition Kafka lag metric in Flink Kafka connector
> --
>
> Key: FLINK-11912
> URL: https://issues.apache.org/jira/browse/FLINK-11912
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.7.2
>Reporter: Shuyi Chen
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> In production, it's important that we expose the Kafka lag by partition 
> metric in order for users to diagnose which Kafka partition is lagging. 
> However, although the Kafka lag by partition metrics are available in 
> KafkaConsumer after 0.10.2,  Flink was not able to properly register it 
> because the metrics are only available after the consumer start polling data 
> from partitions. I would suggest the following fix:
> 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet.
> 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add 
> MetricName for those partitions that we want to register into 
> manualRegisteredMetricSet. 
> 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, 
> try to search for the metrics available in KafkaConsumer, and if found, 
> register it and remove the entry from manualRegisteredMetricSet. 
> The overhead of the above approach is bounded and only incur when discovering 
> new partitions, and registration is done once the KafkaConsumer have the 
> metrics exposed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11799:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> KryoSerializer/OperatorChain ignores copy failure resulting in 
> NullPointerException
> ---
>
> Key: FLINK-11799
> URL: https://issues.apache.org/jira/browse/FLINK-11799
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Jason Kania
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I was encountering a problem with NullPointerExceptions with the deserialized 
> object reaching my ProcessFunction process() method implementation as a null 
> value. Upon investigation, I discovered two issues with the implementation of 
> the KryoSerializer copy().
> 1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
> call generates an exception. The code should report the copy error at least 
> once as a warning to be aware that the kryo copy() is failing. I understand 
> that the code is there to handle the lack of a copy implementation but due to 
> the potential inefficiency of having to write and read the object instead of 
> copying it, this would seem useful information to share at the least. It is 
> also important to have a warning in case the cause of the copy error is 
> something that needs to be fixed.
> 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
> fact that the kryo readObject(Input input, Class aClass) method may return a 
> null value if there are any issues. This could be handled with a check or 
> warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method 
> but is also ignored there, allowing a null value to be passed along without 
> providing any reason for the null value in logging.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11868:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> [filesystems] Introduce listStatusIterator API to file system
> -
>
> Key: FLINK-11868
> URL: https://issues.apache.org/jira/browse/FLINK-11868
> Project: Flink
>  Issue Type: New Feature
>  Components: FileSystems
>Reporter: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
> Fix For: 1.14.0
>
>
> From existed experience, we know {{listStatus}} is expensive for many 
> distributed file systems especially when the folder contains too many files. 
> This method would not only block the thread until result is return but also 
> could cause OOM due to the returned array of {{FileStatus}} is really large. 
> I think we should already learn it from FLINK-7266 and FLINK-8540.
> However, list file status under a path is really helpful in many situations. 
> Thankfully, many distributed file system noticed that and provide API such as 
> {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}}
>  to call the file system on demand.
>  
> We should also introduce this API and replace current implementation which 
> used previous {{listStatus}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2021-06-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-3769:
--
Labels: auto-unassigned rabbitmq stale-major  (was: auto-unassigned 
rabbitmq)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Priority: Major
>  Labels: auto-unassigned, rabbitmq, stale-major
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22737) Add support for CURRENT_WATERMARK to SQL

2021-06-17 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-22737.

Fix Version/s: 1.14.0
   Resolution: Fixed

Fixed in 1.14.0: 8bd215dc28ae431120e5b1a114a94ddc6e004b16

> Add support for CURRENT_WATERMARK to SQL
> 
>
> Key: FLINK-22737
> URL: https://issues.apache.org/jira/browse/FLINK-22737
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: David Anderson
>Assignee: Ingo Bürk
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: screenshot-2021-05-31_14-22-43.png
>
>
> With a built-in function returning the current watermark, one could operate 
> on late events without resorting to using the DataStream API.
> Called with zero parameters, this function returns the current watermark for 
> the current row – if there is an event time attribute. Otherwise, it returns 
> NULL. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22948) Scala example for toDataStream does not compile

2021-06-17 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-22948:


Assignee: Timo Walther

> Scala example for toDataStream does not compile
> ---
>
> Key: FLINK-22948
> URL: https://issues.apache.org/jira/browse/FLINK-22948
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.13.1
>Reporter: David Anderson
>Assignee: Timo Walther
>Priority: Major
>
> The scala example at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-todatastream]
>  does not compile – {{User.class}} should be {{classOf[User]}}.
> It would also be better to show the table DDL as
> {{tableEnv.executeSql(}}
> {{  """}}
> {{  CREATE TABLE GeneratedTable (}}
> {{    name STRING,}}
> {{    score INT,}}
> {{    event_time TIMESTAMP_LTZ(3),}}
> {{    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND}}
> {{  )}}
> {{  WITH ('connector'='datagen')}}
> {{  """}}
> {{)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23022) Plugin classloader settings do not work as described

2021-06-17 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-23022:


 Summary: Plugin classloader settings do not work as described
 Key: FLINK-23022
 URL: https://issues.apache.org/jira/browse/FLINK-23022
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Reporter: Dawid Wysakowicz


The options {{plugin.classloader.parent-first-patterns}} are documented as all 
patterns that are put there should be loaded from the Flink classloader instead 
of the plugin classloader.

However, the way they work is that they define a set of patterns allowed to be 
pulled from the parent classloader. The plugin classloader takes precedence in 
all cases. And a class can be loaded from the parent classloader only if it 
matches the pattern in one of the aforementioned options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22948) Scala example for toDataStream does not compile

2021-06-17 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-22948.

Fix Version/s: 1.13.2
   1.14.0
   Resolution: Fixed

Fixed in 1.14.0: e23337aace8f02fb8b940480a4ce3c2f054f323e
Fixed in 1.13.2: bf304e49c4e6ea7b8d73cfb7709e87a2e4c8b52a

> Scala example for toDataStream does not compile
> ---
>
> Key: FLINK-22948
> URL: https://issues.apache.org/jira/browse/FLINK-22948
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.13.1
>Reporter: David Anderson
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.14.0, 1.13.2
>
>
> The scala example at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-todatastream]
>  does not compile – {{User.class}} should be {{classOf[User]}}.
> It would also be better to show the table DDL as
> {{tableEnv.executeSql(}}
> {{  """}}
> {{  CREATE TABLE GeneratedTable (}}
> {{    name STRING,}}
> {{    score INT,}}
> {{    event_time TIMESTAMP_LTZ(3),}}
> {{    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND}}
> {{  )}}
> {{  WITH ('connector'='datagen')}}
> {{  """}}
> {{)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23020) NullPointerException when running collect twice from Python API

2021-06-17 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-23020:

Component/s: (was: API / Python)
 Table SQL / Planner

> NullPointerException when running collect twice from Python API
> ---
>
> Key: FLINK-23020
> URL: https://issues.apache.org/jira/browse/FLINK-23020
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.1
>Reporter: Maciej Bryński
>Priority: Major
>
> Hi, 
> I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in 
> following scenario.
> 1. I'm creating datagen table.
> {code:java}
> from pyflink.table import EnvironmentSettings, TableEnvironment, 
> StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.common import Configuration, Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> conf = Configuration()
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> table_env.get_config().get_configuration().set_integer("parallelism.default", 
> 1)
> table_env.execute_sql("DROP TABLE IF EXISTS datagen")
> table_env.execute_sql("""
> CREATE TABLE datagen (
> id INT
> ) WITH (
> 'connector' = 'datagen'
> )
> """)
> {code}
> 2. Then I'm running collect
> {code:java}
> try:
> result = table_env.sql_query("select * from datagen limit 1").execute()
> for r in result.collect():
> print(r)
> except KeyboardInterrupt:
> result.get_job_client().cancel()
> {code}
> 3. I'm using "interrupt the kernel" button. This is handled by above 
> try/except and will cancel the query.
> 4. I'm running collect from point 2 one more time. Result:
> {code:java}
> ---
> Py4JJavaError Traceback (most recent call last)
>  in 
>   1 try:
> > 2 result = table_env.sql_query("select * from datagen limit 
> 1").execute()
>   3 for r in result.collect():
>   4 print(r)
>   5 except KeyboardInterrupt:
> /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
>1070 """
>1071 self._t_env._before_execute()
> -> 1072 return TableResult(self._j_table.execute())
>1073 
>1074 def explain(self, *extra_details: ExplainDetail) -> str:
> /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, 
> *args)
>1283 
>1284 answer = self.gateway_client.send_command(command)
> -> 1285 return_value = get_return_value(
>1286 answer, self.gateway_client, self.target_id, self.name)
>1287 
> /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, 
> **kw)
> 144 def deco(*a, **kw):
> 145 try:
> --> 146 return f(*a, **kw)
> 147 except Py4JJavaError as e:
> 148 from pyflink.java_gateway import get_gateway
> /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
> 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 325 if answer[1] == REFERENCE_TYPE:
> --> 326 raise Py4JJavaError(
> 327 "An error occurred while calling {0}{1}{2}.\n".
> 328 format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o69.execute.
> : java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
>   at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
>   at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>   at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
>   at 
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>   at 
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>   at 
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>   at 
> org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858)
>   

[jira] [Commented] (FLINK-23020) NullPointerException when running collect twice from Python API

2021-06-17 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364896#comment-17364896
 ] 

Dian Fu commented on FLINK-23020:
-

This is more like an issue of the underlying planner. cc [~godfreyhe]

> NullPointerException when running collect twice from Python API
> ---
>
> Key: FLINK-23020
> URL: https://issues.apache.org/jira/browse/FLINK-23020
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.1
>Reporter: Maciej Bryński
>Priority: Major
>
> Hi, 
> I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in 
> following scenario.
> 1. I'm creating datagen table.
> {code:java}
> from pyflink.table import EnvironmentSettings, TableEnvironment, 
> StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.common import Configuration, Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> conf = Configuration()
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> table_env.get_config().get_configuration().set_integer("parallelism.default", 
> 1)
> table_env.execute_sql("DROP TABLE IF EXISTS datagen")
> table_env.execute_sql("""
> CREATE TABLE datagen (
> id INT
> ) WITH (
> 'connector' = 'datagen'
> )
> """)
> {code}
> 2. Then I'm running collect
> {code:java}
> try:
> result = table_env.sql_query("select * from datagen limit 1").execute()
> for r in result.collect():
> print(r)
> except KeyboardInterrupt:
> result.get_job_client().cancel()
> {code}
> 3. I'm using "interrupt the kernel" button. This is handled by above 
> try/except and will cancel the query.
> 4. I'm running collect from point 2 one more time. Result:
> {code:java}
> ---
> Py4JJavaError Traceback (most recent call last)
>  in 
>   1 try:
> > 2 result = table_env.sql_query("select * from datagen limit 
> 1").execute()
>   3 for r in result.collect():
>   4 print(r)
>   5 except KeyboardInterrupt:
> /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
>1070 """
>1071 self._t_env._before_execute()
> -> 1072 return TableResult(self._j_table.execute())
>1073 
>1074 def explain(self, *extra_details: ExplainDetail) -> str:
> /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, 
> *args)
>1283 
>1284 answer = self.gateway_client.send_command(command)
> -> 1285 return_value = get_return_value(
>1286 answer, self.gateway_client, self.target_id, self.name)
>1287 
> /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, 
> **kw)
> 144 def deco(*a, **kw):
> 145 try:
> --> 146 return f(*a, **kw)
> 147 except Py4JJavaError as e:
> 148 from pyflink.java_gateway import get_gateway
> /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
> 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 325 if answer[1] == REFERENCE_TYPE:
> --> 326 raise Py4JJavaError(
> 327 "An error occurred while calling {0}{1}{2}.\n".
> 328 format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o69.execute.
> : java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
>   at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
>   at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>   at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
>   at 
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>   at 
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>   at 
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>   at 
> org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
>   at 
> org.apache.calcite.plan.hep.HepPlan

[jira] [Commented] (FLINK-22587) Support aggregations in batch mode with DataStream API

2021-06-17 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364897#comment-17364897
 ] 

Etienne Chauchot commented on FLINK-22587:
--

[~sjwiesman] thanks for the pointer, I manually coded an inner join using 
_KeyedCoProcessFunction_ and states and it works correctly. I'm sending a mail 
to the ML in case it is useful to some users

> Support aggregations in batch mode with DataStream API
> --
>
> Key: FLINK-22587
> URL: https://issues.apache.org/jira/browse/FLINK-22587
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0, 1.13.0
>Reporter: Etienne Chauchot
>Priority: Major
>
> A pipeline like this *in batch mode* would output no data
> {code:java}
> stream.join(otherStream)
> .where()
> .equalTo()
> .window(GlobalWindows.create())
> .apply()
> {code}
> Indeed the default trigger for GlobalWindow is NeverTrigger which never 
> fires. If we set a _EventTimeTrigger_ it will fire with every element as the 
> watermark will be set to +INF (batch mode) and will pass the end of the 
> global window with each new element. A _ProcessingTimeTrigger_ never fires 
> either and all elapsed time or delta based triggers would not be suited for 
> batch.
> Same goes for _reduce()_ instead of join().
> So I guess we miss something for batch support with DataStream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22682) Checkpoint interval too large for higher DOP

2021-06-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-22682.
--
Resolution: Not A Bug

This is not a bug - better logging merged to master as described above.

> Checkpoint interval too large for higher DOP
> 
>
> Key: FLINK-22682
> URL: https://issues.apache.org/jira/browse/FLINK-22682
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Arvid Heise
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> When running a job on EMR with DOP of 160, checkpoints are not triggered at 
> the set interval. I used an interval of 10s and the average checkpoint took 
> <10s, so I'd expect ~6 checkpoints per minute.
> However, the actual completion message was heavily delayed. I had 
> backpressure in this test and used unaligned checkpoints.
> {noformat}
> 2021-05-14 10:06:39,182 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 1 for job 58df7eb721aaefbfb08168b2c3fd6717 (8940061335 bytes in 
> 9097 ms).
> 2021-05-14 10:06:39,205 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 1 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:06:40,223 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 2 (type=CHECKPOINT) @ 1620986800082 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:07:49,263 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 2 for job 58df7eb721aaefbfb08168b2c3fd6717 (8286704522 bytes in 
> 6490 ms).
> 2021-05-14 10:07:49,281 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 2 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:07:49,443 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 3 (type=CHECKPOINT) @ 1620986869281 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:08:55,679 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 3 for job 58df7eb721aaefbfb08168b2c3fd6717 (7398457668 bytes in 
> 6182 ms).
> 2021-05-14 10:08:55,694 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 3 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:08:55,820 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 4 (type=CHECKPOINT) @ 1620986935694 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:09:58,024 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 4 for job 58df7eb721aaefbfb08168b2c3fd6717 (7402199053 bytes in 
> 6179 ms).
> 2021-05-14 10:09:58,035 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 4 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:09:58,154 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 5 (type=CHECKPOINT) @ 1620986998035 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:11:02,694 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 5 for job 58df7eb721aaefbfb08168b2c3fd6717 (7395692776 bytes in 
> 6788 ms).
> 2021-05-14 10:11:02,705 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 5 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:11:02,830 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 6 (type=CHECKPOINT) @ 1620987062705 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:12:05,043 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 6 for job 58df7eb721aaefbfb08168b2c3fd6717 (7388207757 bytes in 
> 6025 ms).
> 2021-05-14 10:12:05,054 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 6 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:12:05,182 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 7 (type=CHECKPOINT) @ 1620987125054 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:13:04,754 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 7 for job 58df7eb721aaefbfb08168b2c3fd6717 (7360227162 bytes in 
> 6469 ms).
> 2021-05-14 10:13:04,779 INFO  
> org.apache.flink.runtime.source.coord

[jira] [Comment Edited] (FLINK-22682) Checkpoint interval too large for higher DOP

2021-06-17 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364900#comment-17364900
 ] 

Piotr Nowojski edited comment on FLINK-22682 at 6/17/21, 11:58 AM:
---

Thanks for taking care of this [~akalashnikov] and [~roman_khachatryan]. Close 
this as not a bug - better logging merged to master as described above.


was (Author: pnowojski):
This is not a bug - better logging merged to master as described above.

> Checkpoint interval too large for higher DOP
> 
>
> Key: FLINK-22682
> URL: https://issues.apache.org/jira/browse/FLINK-22682
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Arvid Heise
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> When running a job on EMR with DOP of 160, checkpoints are not triggered at 
> the set interval. I used an interval of 10s and the average checkpoint took 
> <10s, so I'd expect ~6 checkpoints per minute.
> However, the actual completion message was heavily delayed. I had 
> backpressure in this test and used unaligned checkpoints.
> {noformat}
> 2021-05-14 10:06:39,182 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 1 for job 58df7eb721aaefbfb08168b2c3fd6717 (8940061335 bytes in 
> 9097 ms).
> 2021-05-14 10:06:39,205 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 1 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:06:40,223 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 2 (type=CHECKPOINT) @ 1620986800082 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:07:49,263 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 2 for job 58df7eb721aaefbfb08168b2c3fd6717 (8286704522 bytes in 
> 6490 ms).
> 2021-05-14 10:07:49,281 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 2 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:07:49,443 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 3 (type=CHECKPOINT) @ 1620986869281 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:08:55,679 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 3 for job 58df7eb721aaefbfb08168b2c3fd6717 (7398457668 bytes in 
> 6182 ms).
> 2021-05-14 10:08:55,694 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 3 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:08:55,820 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 4 (type=CHECKPOINT) @ 1620986935694 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:09:58,024 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 4 for job 58df7eb721aaefbfb08168b2c3fd6717 (7402199053 bytes in 
> 6179 ms).
> 2021-05-14 10:09:58,035 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 4 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:09:58,154 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 5 (type=CHECKPOINT) @ 1620986998035 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:11:02,694 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 5 for job 58df7eb721aaefbfb08168b2c3fd6717 (7395692776 bytes in 
> 6788 ms).
> 2021-05-14 10:11:02,705 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 5 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:11:02,830 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 6 (type=CHECKPOINT) @ 1620987062705 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05-14 10:12:05,043 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 6 for job 58df7eb721aaefbfb08168b2c3fd6717 (7388207757 bytes in 
> 6025 ms).
> 2021-05-14 10:12:05,054 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 6 as completed for source Source: Sequence Source -> Map.
> 2021-05-14 10:12:05,182 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 7 (type=CHECKPOINT) @ 1620987125054 for job 
> 58df7eb721aaefbfb08168b2c3fd6717.
> 2021-05

[jira] [Updated] (FLINK-21952) Make all the "Connection reset by peer" exception wrapped as RemoteTransportException

2021-06-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-21952:
---
Priority: Minor  (was: Major)

> Make all the "Connection reset by peer" exception wrapped as 
> RemoteTransportException
> -
>
> Key: FLINK-21952
> URL: https://issues.apache.org/jira/browse/FLINK-21952
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-assigned
>
> In CreditBasedPartitionRequestClientHandler#exceptionCaught, the IOException 
> or the exception with exact message "Connection reset by peer" are marked as 
> RemoteTransportException. 
> However, with the current Netty implementation, sometimes it might throw 
> {code:java}
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>  readAddress(..) failed: Connection reset by peer
> {code}
> in some case. It would be also wrapped as LocalTransportException, which 
> might cause some confusion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21952) Make all the "Connection reset by peer" exception wrapped as RemoteTransportException

2021-06-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-21952:
---
Affects Version/s: 1.14.0
   1.13.1
   1.12.4

> Make all the "Connection reset by peer" exception wrapped as 
> RemoteTransportException
> -
>
> Key: FLINK-21952
> URL: https://issues.apache.org/jira/browse/FLINK-21952
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: auto-deprioritized-major, stale-assigned
>
> In CreditBasedPartitionRequestClientHandler#exceptionCaught, the IOException 
> or the exception with exact message "Connection reset by peer" are marked as 
> RemoteTransportException. 
> However, with the current Netty implementation, sometimes it might throw 
> {code:java}
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>  readAddress(..) failed: Connection reset by peer
> {code}
> in some case. It would be also wrapped as LocalTransportException, which 
> might cause some confusion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22367) JobMasterStopWithSavepointITCase.terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished times out

2021-06-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-22367:
---
Priority: Minor  (was: Major)

> JobMasterStopWithSavepointITCase.terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished
>  times out
> --
>
> Key: FLINK-22367
> URL: https://issues.apache.org/jira/browse/FLINK-22367
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-unassigned, 
> test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16818&view=logs&j=2c3cbe13-dee0-5837-cf47-3053da9a8a78&t=2c7d57b9-7341-5a87-c9af-2cf7cc1a37dc&l=3844
> {code}
> [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
> 13.135 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase
> Apr 19 22:28:44 [ERROR] 
> terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished(org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase)
>   Time elapsed: 10.237 s  <<< ERROR!
> Apr 19 22:28:44 java.util.concurrent.ExecutionException: 
> java.util.concurrent.TimeoutException: Invocation of public default 
> java.util.concurrent.CompletableFuture 
> org.apache.flink.runtime.webmonitor.RestfulGateway.stopWithSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time)
>  timed out.
> Apr 19 22:28:44   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> Apr 19 22:28:44   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> Apr 19 22:28:44   at 
> org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.stopWithSavepointNormalExecutionHelper(JobMasterStopWithSavepointITCase.java:123)
> Apr 19 22:28:44   at 
> org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished(JobMasterStopWithSavepointITCase.java:111)
> Apr 19 22:28:44   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> Apr 19 22:28:44   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Apr 19 22:28:44   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Apr 19 22:28:44   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> Apr 19 22:28:44   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Apr 19 22:28:44   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Apr 19 22:28:44   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Apr 19 22:28:44   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Apr 19 22:28:44   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Apr 19 22:28:44   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> Apr 19 22:28:44   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Apr 19 22:28:44   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Apr 19 22:28:44   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Apr 19 22:28:44   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Apr 19 22:28:44   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Apr 19 22:28:44   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Apr 19 22:28:44   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Apr 19 22:28:44   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Apr 19 22:28:44   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Apr 19 22:28:44   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Apr 19 22:28:44   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Apr 19 22:28:44   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> Apr 19 22:28:44   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> Apr 19 22:28:44   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Apr 19 22:28:44   at 
> org.junit.runners.ParentRunner.run(ParentR

[jira] [Reopened] (FLINK-19537) Processed in-flight bytes metric is not accurate

2021-06-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-19537:


> Processed in-flight bytes metric is not accurate
> 
>
> Key: FLINK-19537
> URL: https://issues.apache.org/jira/browse/FLINK-19537
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.12.0
>Reporter: Piotr Nowojski
>Priority: Minor
>  Labels: auto-closed
>
> Processed in-flight bytes as introduced in FLINK-18662 is not entirely 
> accurate, as it's ignoring the buffer/bytes accumulated in the record 
> deserializers. If buffer is processed here, it doesn't mean it was fully 
> processed (so we can over estimate the amount of processed bytes). On the 
> other hand some records/bytes might be processed without polling anything 
> from this {{CheckpointedInputGate}} (underestimating the amount of processed 
> bytes). All in all this should have been calculated on the 
> {{StreamTaskNetworkInput}} level, where we have an access to the records 
> deserializers. However the current is on average accurate and it might be 
> just good enough (at least for the time being).
> Also this metric is currently ignoring chained source inputs to the multiple 
> input stream task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-17477) resumeConsumption call should happen as quickly as possible to minimise latency

2021-06-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-17477:


> resumeConsumption call should happen as quickly as possible to minimise 
> latency
> ---
>
> Key: FLINK-17477
> URL: https://issues.apache.org/jira/browse/FLINK-17477
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Piotr Nowojski
>Priority: Minor
>  Labels: auto-closed
>
> We should be calling {{InputGate#resumeConsumption()}} as soon as possible 
> (to avoid any unnecessary delay/latency when task is idling). Currently I 
> think it’s mostly fine - the important bit is that on the happy path, we 
> always {{resumeConsumption}} before trying to complete the checkpoint, so 
> that netty threads will start resuming the network traffic while the task 
> thread is doing the synchronous part of the checkpoint and starting 
> asynchronous part. But I think in two places we are first aborting checkpoint 
> and only then resuming consumption (in {{CheckpointBarrierAligner}}):
> {code}
> // let the task know we are not completing this
> notifyAbort(currentCheckpointId,
>   new CheckpointException(
>   "Barrier id: " + barrierId,
>   CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
> // abort the current checkpoint
> releaseBlocksAndResetBarriers();
> {code}
> {code}
> // let the task know we skip a checkpoint
> notifyAbort(currentCheckpointId,
>   new 
> CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
> // no chance to complete this checkpoint
> releaseBlocksAndResetBarriers();
> {code}
> It’s not a big deal, as those are a rare conditions, but it would be better 
> to be consistent everywhere: first release blocks and resume consumption, 
> before anything else happens. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >