[jira] [Created] (FLINK-24182) Tasks canceler should not immediately interrupt

2021-09-07 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-24182:
---

 Summary: Tasks canceler should not immediately interrupt
 Key: FLINK-24182
 URL: https://issues.apache.org/jira/browse/FLINK-24182
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Arvid Heise


While debugging resource leaks (FLINK-24131), I found that any connector is 
immediately interrupted on cancel. Hence, any attempts of using blocking calls 
in {{close}} to cleanup resources are immediately unreliable (e.g. aborting 
transactions).

It would be nice if tasks get a grace period (e.g. task.cancellation.interval) 
where they can try to free resources in a proper, potentially blocking fashion 
before being interrupted.

Nevertheless, connectors should always expect interruptions during shutdown, in 
particular when the user-configurable grace period is depleted. I'd add that to 
the connector documentation in a separate effort.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24184) Potential race condition leading to incorrectly issued interruptions

2021-09-07 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-24184:
--

 Summary: Potential race condition leading to incorrectly issued 
interruptions
 Key: FLINK-24184
 URL: https://issues.apache.org/jira/browse/FLINK-24184
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.13.2, 1.12.5, 1.11.4, 1.10.3, 1.9.3, 1.8.3, 1.14.0
Reporter: Piotr Nowojski
 Fix For: 1.15.0


There is a  race condition in disabling interrupts while closing resources. 
Currently this is guarded by a volatile variable, but there might be a race 
condition when:
1. interrupter thread first checked the shouldInterruptOnCancel flag
2. shouldInterruptOnCancel flag switched to false as Task/StreamTask entered 
cleaning up phase
3. interrupter issued an interrupt while Task/StreamTask are closing/releasing 
resources, potentially causing a memory leak



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24185) flink-s3-fs-presto bundled dependencies not listed in NOTICE file

2021-09-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-24185:


 Summary: flink-s3-fs-presto bundled dependencies not listed in 
NOTICE file
 Key: FLINK-24185
 URL: https://issues.apache.org/jira/browse/FLINK-24185
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.14.0
Reporter: Xintong Song
 Fix For: 1.14.0


The following bundled dependencies missing in NOTICE file.
* aopalliance:aopalliance:1.0
* com.facebook.airlift:configuration:0.201
* com.facebook.airlift:log:0.201
* com.facebook.airlift:stats:0.201
* com.facebook.presto.hadoop:hadoop-apache2:2.7.3-1
* com.facebook.presto:presto-common:0.257
* com.facebook.presto:presto-hive-common:0.257
* com.facebook.presto:presto-hive-metastore:0.257
* com.facebook.presto:presto-hive:0.257
* com.google.inject:guice:4.2.2
* dnsjava:dnsjava:2.1.7
* io.airlift:slice:0.38
* io.airlift:units:1.3
* org.alluxio:alluxio-shaded-client:2.5.0-3
* org.hdrhistogram:HdrHistogram:2.1.9
* org.weakref:jmxutils:1.19

The following bundled dependencies listed with incorrect version in NOTICE file.
* com.google.guava:guava:26.0-jre

The following should not be listed in NOTICE file
* com.google.errorprone:error_prone_annotations:2.2.0
* com.google.guava:failureaccess:1.0
* com.google.guava:listenablefuture:.0-empty-to-avoid-conflict-with-guava
* com.google.j2objc:j2objc-annotations:1.1
* commons-lang:commons-lang:2.6
* org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.1.1
* org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7:1.1.1
* org.wildfly.openssl:wildfly-openssl:1.0.7.Final



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24186) Disable single rowtime column check for collect/print

2021-09-07 Thread Timo Walther (Jira)
Timo Walther created FLINK-24186:


 Summary: Disable single rowtime column check for collect/print
 Key: FLINK-24186
 URL: https://issues.apache.org/jira/browse/FLINK-24186
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


As seen in FLINK-23751, the single rowtime column check can occur also during 
collecting and printing which is not important there as watermarks as not used.

The exception is also misleading as it references a {{DataStream}}:
{code:java}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Found more than one rowtime field: 
[bidtime, window_time] in the query when insert into 
'default_catalog.default_database.Unregistered_Collect_Sink_8'.
Please select the rowtime field that should be used as event-time timestamp for 
the DataStream by casting all other fields to TIMESTAMP.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24187) Could not commit s3 file after JM restart during state initialization

2021-09-07 Thread Youjun Yuan (Jira)
Youjun Yuan created FLINK-24187:
---

 Summary: Could not commit s3 file after JM restart during state 
initialization
 Key: FLINK-24187
 URL: https://issues.apache.org/jira/browse/FLINK-24187
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.12.1
Reporter: Youjun Yuan


we have a SQL job which consumes from Kafka, and write hive table, data stored 
in S3.

One day the zookeeper leader failed over, caused Flink job restart. However the 
job got stuck during state restore, with the following error:
{code:java}
java.io.IOException: Could not commit file from 
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64
 to 
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371
 at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:75)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) 
~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
[flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) 
[?:1.8.0_242] Caused by: java.io.IOException: 
java.util.concurrent.CancellationException at 
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.jav

[jira] [Created] (FLINK-24188) flink-python NOTICE file contains unnecessary dependencies

2021-09-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-24188:


 Summary: flink-python NOTICE file contains unnecessary dependencies
 Key: FLINK-24188
 URL: https://issues.apache.org/jira/browse/FLINK-24188
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Xintong Song
 Fix For: 1.14.0


The following items in the NOTICE file should not be needed.
* com.github.jponge:lzma-java:1.3
* com.google.api.grpc:proto-google-common-protos:1.12.0
* com.google.auth:google-auth-library-credentials:0.18.0
* com.google.code.gson:gson:2.8.6
* com.google.guava:guava:26.0-jre
* com.google.protobuf.nano:protobuf-javanano:3.0.0-alpha-5
* com.google.protobuf:protobuf-java-util:3.11.0
* com.google.protobuf:protobuf-java:3.11.0
* com.jcraft:jzlib:1.1.3
* com.ning:compress-lzf:1.0.3
* io.grpc:grpc-auth:1.26.0
* io.grpc:grpc-context:1.26.0
* io.grpc:grpc-core:1.26.0
* io.grpc:grpc-netty:1.26.0
* io.grpc:grpc-protobuf:1.26.0
* io.grpc:grpc-stub:1.26.0
* io.grpc:grpc-testing:1.26.0
* io.netty:netty-buffer:4.1.51.Final
* io.netty:netty-codec-http2:4.1.51.Final
* io.netty:netty-codec-http:4.1.51.Final
* io.netty:netty-codec-socks:4.1.51.Final
* io.netty:netty-codec:4.1.51.Final
* io.netty:netty-common:4.1.51.Final
* io.netty:netty-handler-proxy:4.1.51.Final
* io.netty:netty-handler:4.1.51.Final
* io.netty:netty-resolver:4.1.51.Final
* io.netty:netty-tcnative-boringssl-static:2.0.33.Final
* io.netty:netty-transport-native-epoll:4.1.51.Final
* io.netty:netty-transport-native-unix-common:4.1.51.Final
* io.netty:netty-transport:4.1.51.Final
* io.opencensus:opencensus-api:0.24.0
* io.opencensus:opencensus-contrib-grpc-metrics:0.24.0
* io.perfmark:perfmark-api:0.19.0
* net.jpountz.lz4:lz4:1.3.0
* org.bouncycastle:bcpkix-jdk15on:1.54
* org.bouncycastle:bcprov-jdk15on:1.54



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24189) Debloating buffer for multiply gates

2021-09-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24189:
-

 Summary: Debloating buffer for multiply gates
 Key: FLINK-24189
 URL: https://issues.apache.org/jira/browse/FLINK-24189
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


Right now, the buffer debloat suppose that works with SingleInputGates which 
have a similar load. It needs to support UnionInputGate in case of data skew. 
The possible implementation can be the calculation of the throughput separately 
for each gate or calculation of the total throughput but choosing the buffer 
size independently for each gate based on their buffers in use.

It is highly important to fairly share the throughput among all gates. In other 
words, avoid the situation:
* gate1 has a low load while gate2 has a high load
* the small buffer size was set for gate1 and the big buffer size for gate2
* the load for gate1 increased up to the load of gate2
* it is impossible to increase the buffer size for gate1 because it is no 
reason to decrease the buffer size for gate2 since the load for it doesn't 
change



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24190) Handling large record with buffer debloat

2021-09-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24190:
-

 Summary: Handling large record with buffer debloat
 Key: FLINK-24190
 URL: https://issues.apache.org/jira/browse/FLINK-24190
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


If the buffer size will be too small(less than record size) due to buffer 
debloat it can lead to performance degradation. It looks like it is better to 
keep the buffer size greater than the record size(or even greater than it). So 
it needs to check how bad it can be and fix it.

Implementation should be easy, we can choose the maximum value between 
desirableBufferSize and recordSize during requesting the new 
buffer(BufferWritingResultPartition#addToSubpartition)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24191) Adjusting number of buffers besides buffer size

2021-09-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24191:
-

 Summary: Adjusting number of buffers besides buffer size
 Key: FLINK-24191
 URL: https://issues.apache.org/jira/browse/FLINK-24191
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


"Buffer debloat" adjusts only the buffer size but it also makes sense to adjust 
the number of buffers. It is not clear for now what should be adjusted and in 
which proportions so it needs to think about how to figure this out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Flink 1.14 Weekly 2021-09-07

2021-09-07 Thread Dawid Wysakowicz
Dear Apache Flink community,

Today was another update and we are making a good progress towards
finally having the release.

*Blockers, critical issues, cross-team testing and documentation*
All of the release blockers are assigned and are making a good progress.
Similarly the cross-team testing is also in a pretty advanced state. We
hope we can solve all of those until the next week.

*Voting RC1*
With the progress well advanced we hope to prepare an RC1 next week
Tuesday which we hope can be voted on and released as Flink 1.14.0

*Release notes*
We created a PR with release notes extracted from JIRA. Please take a
look and see if there is anything missing. If you feel something could
be improved remember to update it in JIRA as well as post a comment in
the PR[1].

Thanks for all the effort you all put into this.

Best,
Xintong, Joe & Dawid


[1] https://github.com/apache/flink/pull/17182




OpenPGP_signature
Description: OpenPGP digital signature


[jira] [Created] (FLINK-24192) Sql get plan failed. All the inputs have relevant nodes, however the cost is still infinite

2021-09-07 Thread xiaojin.wy (Jira)
xiaojin.wy created FLINK-24192:
--

 Summary: Sql get plan failed. All the inputs have relevant nodes, 
however the cost is still infinite
 Key: FLINK-24192
 URL: https://issues.apache.org/jira/browse/FLINK-24192
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: xiaojin.wy
 Fix For: 1.15.0


*sql*

{code:java}
CREATE TABLE database5_t0(
`c0` FLOAT , `c1` FLOAT , `c2` CHAR
) WITH (
 'connector' = 'filesystem',
 'format' = 'testcsv',
 'path' = '$resultPath00'
)
CREATE TABLE database5_t1(
`c0` TINYINT , `c1` INTEGER
) WITH (
 'connector' = 'filesystem',
 'format' = 'testcsv',
 'path' = '$resultPath11'
)
CREATE TABLE database5_t2 (
  `c0` FLOAT
) WITH (
  'connector' = 'filesystem',
  'format' = 'testcsv',
  'path' = '$resultPath33'
)
CREATE TABLE database5_t3 (
  `c0` STRING , `c1` STRING
) WITH (
  'connector' = 'filesystem',
  'format' = 'testcsv',
  'path' = '$resultPath33'
)
INSERT INTO database5_t0(c0, c1, c2) VALUES(cast(0.84355265 as FLOAT), 
cast(0.3269016 as FLOAT), cast('' as CHAR))
INSERT INTO database5_t1(c0, c1) VALUES(cast(-125 as TINYINT), -1715936454)
INSERT INTO database5_t2(c0) VALUES(cast(-1.7159365 as FLOAT))
INSERT INTO database5_t3(c0, c1) VALUES('16:36:29', '1969-12-12')
INSERT INTO MySink
SELECT COUNT(ref0) from (SELECT COUNT(1) AS ref0 FROM database5_t0, 
database5_t3, database5_t1, database5_t2 WHERE CAST ( EXISTS (SELECT 1) AS 
BOOLEAN)
UNION ALL
SELECT COUNT(1) AS ref0 FROM database5_t0, database5_t3, database5_t1, 
database5_t2
WHERE CAST ((NOT CAST (( EXISTS (SELECT 1)) AS BOOLEAN)) AS BOOLEAN)
UNION ALL
SELECT COUNT(1) AS ref0 FROM database5_t0, database5_t3, database5_t1, 
database5_t2 WHERE CAST ((CAST ( EXISTS (SELECT 1) AS BOOLEAN)) IS NULL AS 
BOOLEAN)) as table1
{code}
After excite the sql in it case, we get the error like this:

{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 

FlinkLogicalSink(table=[default_catalog.default_database.MySink], fields=[a])
+- FlinkLogicalCalc(select=[CAST(EXPR$0) AS a])
   +- FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT()])
  +- FlinkLogicalUnion(all=[true])
 :- FlinkLogicalUnion(all=[true])
 :  :- FlinkLogicalCalc(select=[0 AS $f0])
 :  :  +- FlinkLogicalAggregate(group=[{}], ref0=[COUNT()])
 :  : +- FlinkLogicalJoin(condition=[$1], joinType=[semi])
 :  ::- FlinkLogicalCalc(select=[c0])
 :  ::  +- FlinkLogicalJoin(condition=[true], joinType=[inner])
 :  :: :- FlinkLogicalCalc(select=[c0])
 :  :: :  +- FlinkLogicalJoin(condition=[true], 
joinType=[inner])
 :  :: : :- FlinkLogicalCalc(select=[c0])
 :  :: : :  +- FlinkLogicalJoin(condition=[true], 
joinType=[inner])
 :  :: : : :- 
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
database5_t0, project=[c0]]], fields=[c0])
 :  :: : : +- 
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
database5_t3, project=[c0]]], fields=[c0])
 :  :: : +- 
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
database5_t1, project=[c0]]], fields=[c0])
 :  :: +- 
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
database5_t2]], fields=[c0])
 :  :+- FlinkLogicalCalc(select=[IS NOT NULL(m) AS $f0])
 :  :   +- FlinkLogicalAggregate(group=[{}], m=[MIN($0)])
 :  :  +- FlinkLogicalCalc(select=[true AS i])
 :  : +- FlinkLogicalValues(tuples=[[{ 0 }]])
 :  +- FlinkLogicalCalc(select=[0 AS $f0])
 : +- FlinkLogicalAggregate(group=[{}], ref0=[COUNT()])
 :+- FlinkLogicalJoin(condition=[$1], joinType=[anti])
 :   :- FlinkLogicalCalc(select=[c0])
 :   :  +- FlinkLogicalJoin(condition=[true], joinType=[inner])
 :   : :- FlinkLogicalCalc(select=[c0])
 :   : :  +- FlinkLogicalJoin(condition=[true], 
joinType=[inner])
 :   : : :- FlinkLogicalCalc(select=[c0])
 :   : : :  +- FlinkLogicalJoin(condition=[true], 
joinType=[inner])
 :   : : : :- 
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
database5_t0, project=[c0]]], fields=[c0])
 :   : : : +- 
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
database5_t3, project=[c0]]], fields=[c0])
 :   : : +- 
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
database5_t1, project=[c0]]], fields=[c0])
 :   :   

[jira] [Created] (FLINK-24193) Test service entries cause noise in other tests

2021-09-07 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-24193:


 Summary: Test service entries cause noise in other tests
 Key: FLINK-24193
 URL: https://issues.apache.org/jira/browse/FLINK-24193
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.15.0


At various points in Flink we use the ServiceLoader mechanism to load 
implementations, for example filesystems or reporters.
It is thus only natural that we also have some test service implementations 
which are used in specific tests.

However, these test implementations are intended to only be used in very 
specific tests, but are currently but on the classpath for all tests in that 
module (+all that depend on the test-jar of such a module). This causes 
confusion (e.g., suddenly there are 5 reporter factories available) or logging 
noise (e.g., custom netty handlers being loaded by each MiniCluster).

We should implement a junit extension that runs the test with a customized 
classloader, which also as access to a temporary directory containing generated 
service entries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Automated architectural tests

2021-09-07 Thread Ingo Bürk
Thanks, Chesnay. I updated the PR to use a separate module now, and ran it
on a few modules (some Table API modules and a couple connectors). The CI
seemed to take ~2.5min for executing the tests; that's certainly not
negligible. On the other hand, even the few tests implemented already found
several violations ("several" is an understatement, but I manually verified
some of them, not all of them).

On Mon, Sep 6, 2021 at 3:44 PM Chesnay Schepler  wrote:

> While flink-tests is currently the best choice in that it has the
> biggest classpath, it is also the module already requiring the most time
> on CI.
>
> Furthermore, given that we ideally cover all APIs (including connectors
> & formats), having that mess of dependencies in flink-tests may
> interfere with existing / future tests.
>
> As such I would prefer a separate module, as annoying as that may be.
>
> On 06/09/2021 15:26, Ingo Bürk wrote:
> > I just quickly chatted with the author/maintainer of ArchUnit, and a
> module
> > which depends on every module that should be tested seems to be the best
> > solution. How do you feel about using flink-tests for this vs. having a
> > separate module for this purpose?
> >
> >
> > Ingo
> >
> > On Mon, Sep 6, 2021 at 3:04 PM Ingo Bürk  wrote:
> >
> >> Hi Chesnay,
> >>
> >> Those are all great questions, and I want to tackle those as well. For
> the
> >> moment I went per-module, but runtime-wise that isn't ideal the more
> >> modules we'd activate this in. ArchUnit does cache classes between
> tests,
> >> but if we run them individually per module, we'd still add up quite a
> bit
> >> of execution time (a single module in my IDE is around 10s with the
> tests I
> >> currently have implemented, but I suspect the bottleneck here is the
> >> importing of classes, not the number of tests). Ideally we'd just run
> them
> >> once in a module with a big enough classpath to cover everything. If we
> >> have such a place, that would probably be our best shot. I'll also keep
> >> investigating here, of course.
> >>
> >> For now I just pushed a solution to avoid the overlap when executing it
> >> per-module by matching on the URI. It's not the prettiest solution, but
> >> does work; but that's more to not fail the tests in unrelated modules
> and
> >> doesn't help much with execution time.
> >>
> >>
> >> Ingo
> >>
> >> On Mon, Sep 6, 2021 at 1:57 PM Chesnay Schepler 
> >> wrote:
> >>
> >>> Do you have an estimate for long these tests would run for?
> >>>
> >>> For project-wide tests, what are the options for setting that up?
> >>> If we let the tests run per-module then I guess they'd overlap
> >>> considerably (because other Flink modules are being put on the
> >>> classpath), which isn't ideal.
> >>>
> >>> On 06/09/2021 13:51, David Morávek wrote:
>  Hi Ingo,
> 
>  +1 for this effort. This could automate lot of "written rules" that
> are
>  easy to forget about / not to be aware of (such as that each test
> should
>  extend the TestLogger as Till has already mentioned).
> 
>  I went trough your examples and ArchUnit looks really powerful and
>  expressive while still being easy to read.
> 
>  Best,
>  D.
> 
>  On Mon, Sep 6, 2021 at 1:00 PM Ingo Bürk  wrote:
> 
> > Thanks for your input Chesnay!
> >
> > The limitations of ArchUnit probably mostly stem from the fact that
> it
> > operates on byte code and thus can't access anything not accessible
> >>> from
> > byte code, i.e. JavaDocs. But I think Checkstyle and ArchUnit are
> > complementing each other quite well here. The main reason against
> > Checkstyle for these tests is its limitation to single files only,
> > rendering many tests (including the one you mentioned) impossible.
> The
> > secondary reason is that ArchUnit has more declarative APIs and the
> >>> tests
> > become quite easy to write and maintain (some groundwork effort is
> >>> needed,
> > of course). Over time we could probably expand quite a bit more on
> >>> what is
> > tested with ArchUnit as it can test entire architectures (package
> >>> accesses
> > etc.), and it has support for "freezing" known violations to prevent
> >>> new
> > violations and removing existing ones over time.
> >
> > The @VisibleForTesting use case you mentioned is possible; I've
> pushed
> >>> a
> > version of that rule to the draft PR now as well.
> >
> >
> > Best
> > Ingo
> >
> > On Mon, Sep 6, 2021 at 12:11 PM Chesnay Schepler  >
> > wrote:
> >
> >> This sounds like an interesting effort.
> >>
> >> The draft you have opened uses ArchUnit; can you explain a bit what
> >>> the
> >> capabilities/limitations of said tool are?
> >>
> >> One thing we wanted to have for a long time is that methods/classes
> >> annotated with @VisibleForTesting are not called from production
> code;
> >> would that be something that could be implemented?
> >>>

[jira] [Created] (FLINK-24194) resource-providers yarn documentation has Chinese useless characters

2021-09-07 Thread qingbo jiao (Jira)
qingbo jiao created FLINK-24194:
---

 Summary: resource-providers  yarn documentation has Chinese 
useless characters
 Key: FLINK-24194
 URL: https://issues.apache.org/jira/browse/FLINK-24194
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.2
Reporter: qingbo jiao
 Attachments: error_message.png

!error_message.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Automated architectural tests

2021-09-07 Thread Chesnay Schepler

I would say that's fine time-wise.

On 07/09/2021 15:29, Ingo Bürk wrote:

Thanks, Chesnay. I updated the PR to use a separate module now, and ran it
on a few modules (some Table API modules and a couple connectors). The CI
seemed to take ~2.5min for executing the tests; that's certainly not
negligible. On the other hand, even the few tests implemented already found
several violations ("several" is an understatement, but I manually verified
some of them, not all of them).

On Mon, Sep 6, 2021 at 3:44 PM Chesnay Schepler  wrote:


While flink-tests is currently the best choice in that it has the
biggest classpath, it is also the module already requiring the most time
on CI.

Furthermore, given that we ideally cover all APIs (including connectors
& formats), having that mess of dependencies in flink-tests may
interfere with existing / future tests.

As such I would prefer a separate module, as annoying as that may be.

On 06/09/2021 15:26, Ingo Bürk wrote:

I just quickly chatted with the author/maintainer of ArchUnit, and a

module

which depends on every module that should be tested seems to be the best
solution. How do you feel about using flink-tests for this vs. having a
separate module for this purpose?


Ingo

On Mon, Sep 6, 2021 at 3:04 PM Ingo Bürk  wrote:


Hi Chesnay,

Those are all great questions, and I want to tackle those as well. For

the

moment I went per-module, but runtime-wise that isn't ideal the more
modules we'd activate this in. ArchUnit does cache classes between

tests,

but if we run them individually per module, we'd still add up quite a

bit

of execution time (a single module in my IDE is around 10s with the

tests I

currently have implemented, but I suspect the bottleneck here is the
importing of classes, not the number of tests). Ideally we'd just run

them

once in a module with a big enough classpath to cover everything. If we
have such a place, that would probably be our best shot. I'll also keep
investigating here, of course.

For now I just pushed a solution to avoid the overlap when executing it
per-module by matching on the URI. It's not the prettiest solution, but
does work; but that's more to not fail the tests in unrelated modules

and

doesn't help much with execution time.


Ingo

On Mon, Sep 6, 2021 at 1:57 PM Chesnay Schepler 
wrote:


Do you have an estimate for long these tests would run for?

For project-wide tests, what are the options for setting that up?
If we let the tests run per-module then I guess they'd overlap
considerably (because other Flink modules are being put on the
classpath), which isn't ideal.

On 06/09/2021 13:51, David Morávek wrote:

Hi Ingo,

+1 for this effort. This could automate lot of "written rules" that

are

easy to forget about / not to be aware of (such as that each test

should

extend the TestLogger as Till has already mentioned).

I went trough your examples and ArchUnit looks really powerful and
expressive while still being easy to read.

Best,
D.

On Mon, Sep 6, 2021 at 1:00 PM Ingo Bürk  wrote:


Thanks for your input Chesnay!

The limitations of ArchUnit probably mostly stem from the fact that

it

operates on byte code and thus can't access anything not accessible

from

byte code, i.e. JavaDocs. But I think Checkstyle and ArchUnit are
complementing each other quite well here. The main reason against
Checkstyle for these tests is its limitation to single files only,
rendering many tests (including the one you mentioned) impossible.

The

secondary reason is that ArchUnit has more declarative APIs and the

tests

become quite easy to write and maintain (some groundwork effort is

needed,

of course). Over time we could probably expand quite a bit more on

what is

tested with ArchUnit as it can test entire architectures (package

accesses

etc.), and it has support for "freezing" known violations to prevent

new

violations and removing existing ones over time.

The @VisibleForTesting use case you mentioned is possible; I've

pushed

a

version of that rule to the draft PR now as well.


Best
Ingo

On Mon, Sep 6, 2021 at 12:11 PM Chesnay Schepler 
This sounds like an interesting effort.

The draft you have opened uses ArchUnit; can you explain a bit what

the

capabilities/limitations of said tool are?

One thing we wanted to have for a long time is that methods/classes
annotated with @VisibleForTesting are not called from production

code;

would that be something that could be implemented?

It's not a problem imo that tests need to run in order to catch

stuff;

so long as it is noticed on CI.

On 03/09/2021 08:48, Ingo Bürk wrote:

Hi Timo, Till,

thanks for your input already. I'm glad to hear that the idea

resonates,

also thanks for the additional ideas!

I've created a JIRA issue[1] for now just to track this idea. I'm

also

working on a bit of a proof of concept and opened it as a draft

PR[2].

I'm

happy for anyone to join that PR to look and discuss. The PR

doesn't

necessarily intend to be merged in its c

[jira] [Created] (FLINK-24195) Translate "DESCRIBE Statements" page of "SQL" into Chinese

2021-09-07 Thread wuguihu (Jira)
wuguihu created FLINK-24195:
---

 Summary: Translate "DESCRIBE Statements" page of "SQL" into Chinese
 Key: FLINK-24195
 URL: https://issues.apache.org/jira/browse/FLINK-24195
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: wuguihu


[https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/describe/]

docs/content.zh/docs/dev/table/sql/describe.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24196) Translate "EXPLAIN Statements" page of "SQL" into Chinese

2021-09-07 Thread wuguihu (Jira)
wuguihu created FLINK-24196:
---

 Summary: Translate "EXPLAIN Statements" page of "SQL" into Chinese
 Key: FLINK-24196
 URL: https://issues.apache.org/jira/browse/FLINK-24196
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: wuguihu


[https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/explain/]

 

docs/content.zh/docs/dev/table/sql/explain.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24197) Streaming File Sink end-to-end test fails with : "RestClientException: [File upload failed.]"

2021-09-07 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-24197:


 Summary: Streaming File Sink end-to-end test fails with : 
"RestClientException: [File upload failed.]"
 Key: FLINK-24197
 URL: https://issues.apache.org/jira/browse/FLINK-24197
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem, Tests
Affects Versions: 1.15.0
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23672&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=070ff179-953e-5bda-71fa-d6599415701c&l=11040

{code}
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'StreamingFileSinkProgram'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2056)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917)
at FileSinkProgram.main(FileSinkProgram.java:105)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:373)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [File upload 
failed.]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:532)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:512)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24198) KafkaTableITCase#testKafkaTemporalJoinChangelog fail with FlinkJobNotFoundException: Could not find Flink job

2021-09-07 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-24198:


 Summary: KafkaTableITCase#testKafkaTemporalJoinChangelog fail with 
FlinkJobNotFoundException: Could not find Flink job
 Key: FLINK-24198
 URL: https://issues.apache.org/jira/browse/FLINK-24198
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Dawid Wysakowicz






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24199) Expose StreamExecutionEnvironment#configure in Python API

2021-09-07 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-24199:


 Summary: Expose StreamExecutionEnvironment#configure in Python API
 Key: FLINK-24199
 URL: https://issues.apache.org/jira/browse/FLINK-24199
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dawid Wysakowicz


There are certain parameters that can be configured only through the underlying 
configuration of StreamExecutionEnvironment e.g. 
(execution.checkpointing.checkpoints-after-tasks-finish.enabled).

We should be able to set those in the Python API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-182: Watermark alignment

2021-09-07 Thread Thomas Weise
Hi,

I wanted to check if there is active development on FLIP-182 and what the
target release for it might be? [1] still shows as under discussion.

Regarding the per-subtask vs. per-split limitation: I think it will be
important that this eventually works per split, since in some cases it
won't be practical to limit a subtask to a single split (think KafkaSource
reading from many topics with diverse volumes).

Thanks,
Thomas

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

On Wed, Jul 21, 2021 at 4:48 AM Piotr Nowojski  wrote:

> Hi,
>
> >  I would not fully advertise this before we have the second part
> implemented as well.
>
> I'm not sure, maybe we could advertise with a big warning about this
> limitation. I mean it's not as if this change would break something. At
> worst it just wouldn't fully solve the problem with multiple splits per
> single operator, but only limit the scope of that problem. At the same time
> I don't have strong feelings about this. If the consensus would be to not
> advertise it, I'm also fine with it. Only in that case we should probably
> quickly follow up with the per split solution.
>
> Anyway, thanks for voicing your support and the discussions. I'm going to
> start a voting thread for this feature soon.
>
> Best,
> Piotrek
>
> wt., 13 lip 2021 o 19:09 Stephan Ewen  napisał(a):
>
> > @Eron Wright   The per-split watermarks are the
> > default in the new source interface (FLIP-27) and come for free if you
> use
> > the SplitReader.
> >
> > Based on that, it is also possible to unsubscribe individual splits to
> > solve the alignment in the case where operators have multiple splits
> > assigned.
> > Piotr and I already discussed that, but concluded that the implementation
> > of that is largely orthogonal.
> >
> > I am a bit worried, though, that if we release and advertise the
> alignment
> > without handling this case, we create a surprise for quite a few users.
> > While this is admittedly valuable for some users, I think we need to
> > position this accordingly. I would not fully advertise this before we
> have
> > the second part implemented as well.
> >
> >
> >
> > On Mon, Jul 12, 2021 at 7:18 PM Eron Wright  > .invalid>
> > wrote:
> >
> > > The notion of per-split watermarks seems quite interesting.  I think
> the
> > > idleness feature could benefit from a per-split approach too, because
> > > idleness is typically related to whether any splits are assigned to a
> > given
> > > operator instance.
> > >
> > >
> > > On Mon, Jul 12, 2021 at 3:06 AM 刘建刚  wrote:
> > >
> > > > +1 for the source watermark alignment.
> > > > In the previous flink version, the source connectors are different in
> > > > implementation and it is hard to make this feature. When the consumed
> > > data
> > > > is not aligned or consuming history data, it is very easy to cause
> the
> > > > unalignment. Source alignment can resolve many unstable problems.
> > > >
> > > > Seth Wiesman  于2021年7月9日周五 下午11:25写道:
> > > >
> > > > > +1
> > > > >
> > > > > In my opinion, this limitation is perfectly fine for the MVP.
> > Watermark
> > > > > alignment is a long-standing issue and this already moves the ball
> so
> > > far
> > > > > forward.
> > > > >
> > > > > I don't expect this will cause many issues in practice, as I
> > understand
> > > > it
> > > > > the FileSource always processes one split at a time, and in my
> > > > experience,
> > > > > 90% of Kafka users have a small number of partitions scale their
> > > > pipelines
> > > > > to have one reader per partition. Obviously, there are larger-scale
> > > Kafka
> > > > > topics and more sources that will be ported over in the future but
> I
> > > > think
> > > > > there is an implicit understanding that aligning sources adds
> latency
> > > to
> > > > > pipelines, and we can frame the follow-up "per-split" alignment as
> an
> > > > > optimization.
> > > > >
> > > > > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski <
> > > piotr.nowoj...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey!
> > > > > >
> > > > > > A couple of weeks ago me and Arvid Heise played around with an
> idea
> > > to
> > > > > > address a long standing issue of Flink: lack of watermark/event
> > time
> > > > > > alignment between different parallel instances of sources, that
> can
> > > > lead
> > > > > to
> > > > > > ever growing state size for downstream operators like
> > WindowOperator.
> > > > > >
> > > > > > We had an impression that this is relatively low hanging fruit
> that
> > > can
> > > > > be
> > > > > > quite easily implemented - at least partially (the first part
> > > mentioned
> > > > > in
> > > > > > the FLIP document). I have written down our proposal [1] and you
> > can
> > > > also
> > > > > > check out our PoC that we have implemented [2].
> > > > > >
> > > > > > We think that this is a quite easy proposal, that has been in
> large
> > > > part
> > > > > > already implemented. There is one 

Re: [DISCUSS] FLIP-182: Watermark alignment

2021-09-07 Thread Piotr Nowojski
Hi Thomas,

Unfortunately me/Arvid didn't have enough time to finish this off for
1.14.0 as we were firefighting other efforts and we have re-focused on
other more advanced FLIPs. We want to deliver it for 1.15 though. I'm not
sure, but I remember Arvid saying something that he would like to actually
take a look at this in 1.15 cycle with per-split throttling. If not, at the
very least I would like to contribute the version without the per-split
logic, as this is almost done.

Piotrek

wt., 7 wrz 2021 o 19:18 Thomas Weise  napisał(a):

> Hi,
>
> I wanted to check if there is active development on FLIP-182 and what the
> target release for it might be? [1] still shows as under discussion.
>
> Regarding the per-subtask vs. per-split limitation: I think it will be
> important that this eventually works per split, since in some cases it
> won't be practical to limit a subtask to a single split (think KafkaSource
> reading from many topics with diverse volumes).
>
> Thanks,
> Thomas
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>
> On Wed, Jul 21, 2021 at 4:48 AM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > >  I would not fully advertise this before we have the second part
> > implemented as well.
> >
> > I'm not sure, maybe we could advertise with a big warning about this
> > limitation. I mean it's not as if this change would break something. At
> > worst it just wouldn't fully solve the problem with multiple splits per
> > single operator, but only limit the scope of that problem. At the same
> time
> > I don't have strong feelings about this. If the consensus would be to not
> > advertise it, I'm also fine with it. Only in that case we should probably
> > quickly follow up with the per split solution.
> >
> > Anyway, thanks for voicing your support and the discussions. I'm going to
> > start a voting thread for this feature soon.
> >
> > Best,
> > Piotrek
> >
> > wt., 13 lip 2021 o 19:09 Stephan Ewen  napisał(a):
> >
> > > @Eron Wright   The per-split watermarks are the
> > > default in the new source interface (FLIP-27) and come for free if you
> > use
> > > the SplitReader.
> > >
> > > Based on that, it is also possible to unsubscribe individual splits to
> > > solve the alignment in the case where operators have multiple splits
> > > assigned.
> > > Piotr and I already discussed that, but concluded that the
> implementation
> > > of that is largely orthogonal.
> > >
> > > I am a bit worried, though, that if we release and advertise the
> > alignment
> > > without handling this case, we create a surprise for quite a few users.
> > > While this is admittedly valuable for some users, I think we need to
> > > position this accordingly. I would not fully advertise this before we
> > have
> > > the second part implemented as well.
> > >
> > >
> > >
> > > On Mon, Jul 12, 2021 at 7:18 PM Eron Wright  > > .invalid>
> > > wrote:
> > >
> > > > The notion of per-split watermarks seems quite interesting.  I think
> > the
> > > > idleness feature could benefit from a per-split approach too, because
> > > > idleness is typically related to whether any splits are assigned to a
> > > given
> > > > operator instance.
> > > >
> > > >
> > > > On Mon, Jul 12, 2021 at 3:06 AM 刘建刚 
> wrote:
> > > >
> > > > > +1 for the source watermark alignment.
> > > > > In the previous flink version, the source connectors are different
> in
> > > > > implementation and it is hard to make this feature. When the
> consumed
> > > > data
> > > > > is not aligned or consuming history data, it is very easy to cause
> > the
> > > > > unalignment. Source alignment can resolve many unstable problems.
> > > > >
> > > > > Seth Wiesman  于2021年7月9日周五 下午11:25写道:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > In my opinion, this limitation is perfectly fine for the MVP.
> > > Watermark
> > > > > > alignment is a long-standing issue and this already moves the
> ball
> > so
> > > > far
> > > > > > forward.
> > > > > >
> > > > > > I don't expect this will cause many issues in practice, as I
> > > understand
> > > > > it
> > > > > > the FileSource always processes one split at a time, and in my
> > > > > experience,
> > > > > > 90% of Kafka users have a small number of partitions scale their
> > > > > pipelines
> > > > > > to have one reader per partition. Obviously, there are
> larger-scale
> > > > Kafka
> > > > > > topics and more sources that will be ported over in the future
> but
> > I
> > > > > think
> > > > > > there is an implicit understanding that aligning sources adds
> > latency
> > > > to
> > > > > > pipelines, and we can frame the follow-up "per-split" alignment
> as
> > an
> > > > > > optimization.
> > > > > >
> > > > > > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski <
> > > > piotr.nowoj...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey!
> > > > > > >
> > > > > > > A couple of weeks ago me and Arvid Heise played around with an
> > idea
> > > > to
> > > > > 

[jira] [Created] (FLINK-24200) CheckpointBarrierTrackerTest.testTwoLastBarriersOneByOne fails on azure

2021-09-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-24200:


 Summary: CheckpointBarrierTrackerTest.testTwoLastBarriersOneByOne 
fails on azure
 Key: FLINK-24200
 URL: https://issues.apache.org/jira/browse/FLINK-24200
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.12.5
Reporter: Xintong Song
 Fix For: 1.12.6


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23719&view=logs&j=f0ac5c25-1168-55a5-07ff-0e88223afed9&t=0dbaca5d-7c38-52e6-f4fe-2fb69ccb3ada&l=8983

{code}
[ERROR] Tests run: 14, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.893 
s <<< FAILURE! - in 
org.apache.flink.streaming.runtime.io.CheckpointBarrierTrackerTest
[ERROR] 
testTwoLastBarriersOneByOne(org.apache.flink.streaming.runtime.io.CheckpointBarrierTrackerTest)
  Time elapsed: 0.093 s  <<< FAILURE!
java.lang.AssertionError: 

Expected: a value less than or equal to <30L>
 but: <33L> was greater than <30L>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.junit.Assert.assertThat(Assert.java:956)
at org.junit.Assert.assertThat(Assert.java:923)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierTrackerTest.testTwoLastBarriersOneByOne(CheckpointBarrierTrackerTest.java:616)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24201) HistoryServer didn't contain TaskManager Logs

2021-09-07 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-24201:
---

 Summary: HistoryServer didn't contain TaskManager Logs
 Key: FLINK-24201
 URL: https://issues.apache.org/jira/browse/FLINK-24201
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.13.1, 1.12.2
Reporter: zlzhang0122
 Fix For: 1.14.1


Now, HistoryServer didn't contain TaskManager Logs and users cannot see the 
logs on TaskManager.We can also archive the log info on the TaskManager and 
logged it in history server log and show in HistoryServer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24202) BoundedSourceITCase hangs on azure

2021-09-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-24202:


 Summary: BoundedSourceITCase hangs on azure
 Key: FLINK-24202
 URL: https://issues.apache.org/jira/browse/FLINK-24202
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.15.0
Reporter: Xintong Song
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23718&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=0c010d0c-3dec-5bf1-d408-7b18988b1b2b&l=5389



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24203) AbstractServerTest fails due to port conflict

2021-09-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-24203:


 Summary: AbstractServerTest fails due to port conflict
 Key: FLINK-24203
 URL: https://issues.apache.org/jira/browse/FLINK-24203
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Queryable State
Affects Versions: 1.15.0
Reporter: Xintong Song
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23718&view=logs&j=c91190b6-40ae-57b2-5999-31b869b0a7c1&t=41463ccd-0694-5d4d-220d-8f771e7d098b&l=15081

{code}
Sep 08 00:18:11 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 0.19 s <<< FAILURE! - in 
org.apache.flink.queryablestate.network.AbstractServerTest
Sep 08 00:18:11 [ERROR] testServerInitializationFailure  Time elapsed: 0.026 s  
<<< FAILURE!
Sep 08 00:18:11 java.lang.AssertionError: 
Sep 08 00:18:11 
Sep 08 00:18:11 Expected: (an instance of 
org.apache.flink.util.FlinkRuntimeException and exception with message a string 
containing "Unable to start Test Server 2. All ports in provided range are 
occupied.")
Sep 08 00:18:11  but: exception with message a string containing "Unable to 
start Test Server 2. All ports in provided range are occupied." message was 
"Unable to start Test Server 1. All ports in provided range are occupied."
Sep 08 00:18:11 Stacktrace was: org.apache.flink.util.FlinkRuntimeException: 
Unable to start Test Server 1. All ports in provided range are occupied.
Sep 08 00:18:11 at 
org.apache.flink.queryablestate.network.AbstractServerBase.start(AbstractServerBase.java:209)
Sep 08 00:18:11 at 
org.apache.flink.queryablestate.network.AbstractServerTest.testServerInitializationFailure(AbstractServerTest.java:73)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24204) Failed to insert nested types using value constructed functions

2021-09-07 Thread Jark Wu (Jira)
Jark Wu created FLINK-24204:
---

 Summary: Failed to insert nested types using value constructed 
functions 
 Key: FLINK-24204
 URL: https://issues.apache.org/jira/browse/FLINK-24204
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.2
Reporter: Jark Wu


{code}
Flink SQL> CREATE TABLE nested_type (
> row_type ROW,
> map_type MAP,
> array_type ARRAY
> ) WITH ('connector'='print');
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO nested_type VALUES ((1, 'data'), MAP['key', 1], 
ARRAY[1,2,3])
> ;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 
'MAP NOT NULL' to 'MAP'.
{code}

The query works if we change the sink table definition to :

{code}
CREATE TABLE nested_type (
row_type ROW,
map_type MAP,
array_type ARRAY
) WITH (
 'connector'='print'
);
{code}

Therefore, I think the root cause of this problem is Flink SQL doesn't support 
cast for nested types when code generation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24205) AggregateReduceGroupingITCase hangs on azure

2021-09-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-24205:


 Summary: AggregateReduceGroupingITCase hangs on azure
 Key: FLINK-24205
 URL: https://issues.apache.org/jira/browse/FLINK-24205
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.2
Reporter: Xintong Song
 Fix For: 1.13.3


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23720&view=logs&j=955770d3-1fed-5a0a-3db6-0c7554c910cb&t=14447d61-56b4-5000-80c1-daa459247f6a&l=7574

{code}
Sep 08 02:08:51 "main" #1 prio=5 os_prio=0 tid=0x7f02d400b800 nid=0x1471 
waiting on condition [0x7f02dc62a000]
Sep 08 02:08:51java.lang.Thread.State: WAITING (parking)
Sep 08 02:08:51 at sun.misc.Unsafe.park(Native Method)
Sep 08 02:08:51 - parking to wait for  <0xd465c060> (a 
java.util.concurrent.CompletableFuture$Signaller)
Sep 08 02:08:51 at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
Sep 08 02:08:51 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
Sep 08 02:08:51 at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
Sep 08 02:08:51 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
Sep 08 02:08:51 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
Sep 08 02:08:51 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
Sep 08 02:08:51 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128)
Sep 08 02:08:51 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
Sep 08 02:08:51 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
Sep 08 02:08:51 at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
Sep 08 02:08:51 at 
java.util.Iterator.forEachRemaining(Iterator.java:115)
Sep 08 02:08:51 at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
Sep 08 02:08:51 at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
Sep 08 02:08:51 at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
Sep 08 02:08:51 at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106)
Sep 08 02:08:51 at 
org.apache.flink.table.planner.runtime.batch.sql.agg.AggregateReduceGroupingITCase.testAggOnWindow(AggregateReduceGroupingITCase.scala:282)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-182: Watermark alignment

2021-09-07 Thread Arvid Heise
Just to clarify: I specifically asked Piotr to not persue the FLIP if the
current state wouldn't make it in 1.14, such that someone else can take it
over and expand it towards per-split alignment. Having a minimalistic
version in 1.14 + amendment FLIP in 1.15 would have been fine but now I
rather want to have it completely done in one go.

I expect to work on it in October, so feel free to go ahead if you can make
it sooner.

Best,

Arvid
On Tue, Sep 7, 2021 at 8:41 PM Piotr Nowojski  wrote:

> Hi Thomas,
>
> Unfortunately me/Arvid didn't have enough time to finish this off for
> 1.14.0 as we were firefighting other efforts and we have re-focused on
> other more advanced FLIPs. We want to deliver it for 1.15 though. I'm not
> sure, but I remember Arvid saying something that he would like to actually
> take a look at this in 1.15 cycle with per-split throttling. If not, at the
> very least I would like to contribute the version without the per-split
> logic, as this is almost done.
>
> Piotrek
>
> wt., 7 wrz 2021 o 19:18 Thomas Weise  napisał(a):
>
> > Hi,
> >
> > I wanted to check if there is active development on FLIP-182 and what the
> > target release for it might be? [1] still shows as under discussion.
> >
> > Regarding the per-subtask vs. per-split limitation: I think it will be
> > important that this eventually works per split, since in some cases it
> > won't be practical to limit a subtask to a single split (think
> KafkaSource
> > reading from many topics with diverse volumes).
> >
> > Thanks,
> > Thomas
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> >
> > On Wed, Jul 21, 2021 at 4:48 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > >  I would not fully advertise this before we have the second part
> > > implemented as well.
> > >
> > > I'm not sure, maybe we could advertise with a big warning about this
> > > limitation. I mean it's not as if this change would break something. At
> > > worst it just wouldn't fully solve the problem with multiple splits per
> > > single operator, but only limit the scope of that problem. At the same
> > time
> > > I don't have strong feelings about this. If the consensus would be to
> not
> > > advertise it, I'm also fine with it. Only in that case we should
> probably
> > > quickly follow up with the per split solution.
> > >
> > > Anyway, thanks for voicing your support and the discussions. I'm going
> to
> > > start a voting thread for this feature soon.
> > >
> > > Best,
> > > Piotrek
> > >
> > > wt., 13 lip 2021 o 19:09 Stephan Ewen  napisał(a):
> > >
> > > > @Eron Wright   The per-split watermarks are
> the
> > > > default in the new source interface (FLIP-27) and come for free if
> you
> > > use
> > > > the SplitReader.
> > > >
> > > > Based on that, it is also possible to unsubscribe individual splits
> to
> > > > solve the alignment in the case where operators have multiple splits
> > > > assigned.
> > > > Piotr and I already discussed that, but concluded that the
> > implementation
> > > > of that is largely orthogonal.
> > > >
> > > > I am a bit worried, though, that if we release and advertise the
> > > alignment
> > > > without handling this case, we create a surprise for quite a few
> users.
> > > > While this is admittedly valuable for some users, I think we need to
> > > > position this accordingly. I would not fully advertise this before we
> > > have
> > > > the second part implemented as well.
> > > >
> > > >
> > > >
> > > > On Mon, Jul 12, 2021 at 7:18 PM Eron Wright  > > > .invalid>
> > > > wrote:
> > > >
> > > > > The notion of per-split watermarks seems quite interesting.  I
> think
> > > the
> > > > > idleness feature could benefit from a per-split approach too,
> because
> > > > > idleness is typically related to whether any splits are assigned
> to a
> > > > given
> > > > > operator instance.
> > > > >
> > > > >
> > > > > On Mon, Jul 12, 2021 at 3:06 AM 刘建刚 
> > wrote:
> > > > >
> > > > > > +1 for the source watermark alignment.
> > > > > > In the previous flink version, the source connectors are
> different
> > in
> > > > > > implementation and it is hard to make this feature. When the
> > consumed
> > > > > data
> > > > > > is not aligned or consuming history data, it is very easy to
> cause
> > > the
> > > > > > unalignment. Source alignment can resolve many unstable problems.
> > > > > >
> > > > > > Seth Wiesman  于2021年7月9日周五 下午11:25写道:
> > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > In my opinion, this limitation is perfectly fine for the MVP.
> > > > Watermark
> > > > > > > alignment is a long-standing issue and this already moves the
> > ball
> > > so
> > > > > far
> > > > > > > forward.
> > > > > > >
> > > > > > > I don't expect this will cause many issues in practice, as I
> > > > understand
> > > > > > it
> > > > > > > the FileSource always processes one split at a time, and in my
> > > > > > experience,
> > > > > > > 90% of K

[jira] [Created] (FLINK-24206) PulsarSourceITCase fails with "Consumer not found"

2021-09-07 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-24206:


 Summary: PulsarSourceITCase fails with "Consumer not found"
 Key: FLINK-24206
 URL: https://issues.apache.org/jira/browse/FLINK-24206
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.0
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23732&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=24650

{code}
Sep 08 05:08:35 Caused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy
Sep 08 05:08:35 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
Sep 08 05:08:35 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
Sep 08 05:08:35 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
Sep 08 05:08:35 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
Sep 08 05:08:35 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
Sep 08 05:08:35 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
Sep 08 05:08:35 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
Sep 08 05:08:35 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
Sep 08 05:08:35 at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown 
Source)
Sep 08 05:08:35 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Sep 08 05:08:35 at java.lang.reflect.Method.invoke(Method.java:498)
Sep 08 05:08:35 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
Sep 08 05:08:35 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
Sep 08 05:08:35 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
Sep 08 05:08:35 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
Sep 08 05:08:35 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
Sep 08 05:08:35 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
Sep 08 05:08:35 at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
Sep 08 05:08:35 at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
Sep 08 05:08:35 at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
Sep 08 05:08:35 at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
Sep 08 05:08:35 at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
Sep 08 05:08:35 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
Sep 08 05:08:35 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
Sep 08 05:08:35 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
Sep 08 05:08:35 at akka.actor.Actor.aroundReceive(Actor.scala:537)
Sep 08 05:08:35 at akka.actor.Actor.aroundReceive$(Actor.scala:535)
Sep 08 05:08:35 at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
Sep 08 05:08:35 at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
Sep 08 05:08:35 at akka.actor.ActorCell.invoke(ActorCell.scala:548)
Sep 08 05:08:35 at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
Sep 08 05:08:35 at akka.dispatch.Mailbox.run(Mailbox.scala:231)
Sep 08 05:08:35 at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
Sep 08 05:08:35 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Sep 08 05:08:35 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Sep 08 05:08:35 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Sep 08 05:08:35 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Sep 08 05:08:35 Caused by: 
org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: 
Consumer not found
Sep 08 05:08:35 at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:987)
Sep 08 05:08:35 at 
org.apache.pulsar.client.impl.PulsarClientImpl.close(PulsarClientImpl.java:658)
Sep 08 05:08:35 at 
org.apache.flink.connec