[jira] [Created] (FLINK-17567) Create a dedicated Python directory in release directory to place Python-related source and binary packages

2020-05-08 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-17567:


 Summary: Create a dedicated Python directory in release directory 
to place Python-related source and binary packages
 Key: FLINK-17567
 URL: https://issues.apache.org/jira/browse/FLINK-17567
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Release System
Reporter: Huang Xingbo
 Fix For: 1.11.0


We introduced cross platform wheel packages in 1.11.
It is confused that we put all wheel packages and the corresponding signature 
verification files in the root directory of the release.
So we plan to put python-related files in a dedicated sub directory in the 
release directory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17568) Task may consume data after checkpoint barrier before performing checkpoint for unaligned checkpoint

2020-05-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17568:
---

 Summary: Task may consume data after checkpoint barrier before 
performing checkpoint for unaligned checkpoint
 Key: FLINK-17568
 URL: https://issues.apache.org/jira/browse/FLINK-17568
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Yingjie Cao
 Fix For: 1.11.0


For unaligned checkpoint, task may consume data after the checkpoint barrier 
before performing checkpoint which lead to consumption of duplicated data and 
corruption of data stream.

More specifically, when the Netty thread notifies the checkpoint barrier for 
the first time and enqueue a checkpointing task in the mailbox, the task thread 
may still in data consumption loop and if it reads a new checkpoint barrier 
from another channel it will not return to the mailbox and instead it will 
continue to read data until a all data consumed or we have a full record, 
meanwhile, the data after checkpoint barrier may be read and consumed which 
lead to inconsistency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17569) to support ViewFileSystem when wait lease revoke of hadoop filesystem

2020-05-08 Thread konwu (Jira)
konwu created FLINK-17569:
-

 Summary: to support ViewFileSystem when wait lease revoke of 
hadoop filesystem
 Key: FLINK-17569
 URL: https://issues.apache.org/jira/browse/FLINK-17569
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Reporter: konwu


Currently waitUntilLeaseIsRevoked method is not support ViewFileSystem and it 
will case the same issue [fail to recover after taskmanager 
failure|https://issues.apache.org/jira/browse/FLINK-11419]

 

 

I try to resolve the real path and FileSystem of ViewFileSystem  and it works
{code:java}
if (fs instanceof ViewFileSystem) {
   ViewFileSystem vfs = (ViewFileSystem) fs;
   Path resolvePath = vfs.resolvePath(path);
   DistributedFileSystem dfs = (DistributedFileSystem) 
resolvePath.getFileSystem(fs.getConf());
   return waitUntilLeaseIsRevoked(dfs, resolvePath);
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17570) BatchTableEnvironment#fromValues(Object... values) causes StackOverflowError

2020-05-08 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-17570:
-

 Summary: BatchTableEnvironment#fromValues(Object... values) causes 
StackOverflowError 
 Key: FLINK-17570
 URL: https://issues.apache.org/jira/browse/FLINK-17570
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Wei Zhong


The Error can be reproduced by following code:
{code:java}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.fromValues(1L, 2L, 3L);
{code}
The Error is as following:
{code:java}
Exception in thread "main" java.lang.StackOverflowErrorException in thread 
"main" java.lang.StackOverflowError at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) at 
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) 
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
org.apache.flink.table.expressions.ApiExpressionUtils.convertArray(ApiExpressionUtils.java:142)
 at 
org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression(ApiExpressionUtils.java:100)
 at 
org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$2.apply(TableEnvImpl.scala:1030)
 at 
org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$2.apply(TableEnvImpl.scala:1030)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1030)
 at 
org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163)
 at 
org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032)
 at 
org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163)
 at 
org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032)
 at 
org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163)
 at 
org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032)
 at 
org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163)
 at 
org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032)
 at 
org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163)
...{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17571) A better way to show the files used in currently checkpoints

2020-05-08 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-17571:
-

 Summary: A better way to show the files used in currently 
checkpoints
 Key: FLINK-17571
 URL: https://issues.apache.org/jira/browse/FLINK-17571
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing
Reporter: Congxian Qiu(klion26)


Inspired by the [user 
mail]([http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Shared-Checkpoint-Cleanup-and-S3-Lifecycle-Policy-tt34965.html]).

Currently, there are three types of directory for a checkpoint, the files in 
TASKOWND and EXCLUSIVE directory can be deleted safely, but users can't delete 
the files in the SHARED directory safely(the files may be created a long time 
ago).

I think it's better to give users a better way to know which files are 
currently used(so the others are not used)

maybe a command-line command such as below is ok enough to support such a 
feature.

{{./bin/flink checkpoint list $checkpointDir  # list all the files used in 
checkpoint}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17572) Remove checkpoint alignment buffered metric from webui

2020-05-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17572:
---

 Summary: Remove checkpoint alignment buffered metric from webui
 Key: FLINK-17572
 URL: https://issues.apache.org/jira/browse/FLINK-17572
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Affects Versions: 1.11.0
Reporter: Yingjie Cao


After FLINK-16404, we never cache buffers while checkpoint barrier alignment, 
so the checkpoint alignment buffered metric will be always 0, we should remove 
it directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17573) There is duplicate source data in ProcessWindowFunction

2020-05-08 Thread Tammy zhang (Jira)
Tammy zhang created FLINK-17573:
---

 Summary: There is duplicate source data in ProcessWindowFunction
 Key: FLINK-17573
 URL: https://issues.apache.org/jira/browse/FLINK-17573
 Project: Flink
  Issue Type: Bug
Reporter: Tammy zhang


i consumed kafka topic data, and keyby the stream, then use a 
ProcessWindowFunction in this keyedStream, and a strange phenomenon appeared, 
the process function's sourceData become duplicated, like:

Input Data iterator:[H2update 623.0 2020-05-08 15:19:25.14, H2update 297.0 
2020-05-08 15:19:28.501, H2update 832.0 2020-05-08 15:19:29.415]
 data iterator end--
Input Data iterator:[H1400 59.0 2020-05-08 15:19:07.087, H1400 83.0 2020-05-08 
15:19:09.521]
 data iterator end--
Input Data iterator:[H2insert 455.0 2020-05-08 15:19:23.066, H2insert 910.0 
2020-05-08 15:19:23.955, H2insert 614.0 2020-05-08 15:19:24.397, H2insert 556.0 
2020-05-08 15:19:27.389, H2insert 922.0 2020-05-08 15:19:27.761, H2insert 165.0 
2020-05-08 15:19:28.26]
 data iterator end--
Input Data iterator:[H1400 59.0 2020-05-08 15:19:07.087, H1400 83.0 2020-05-08 
15:19:09.521]
 data iterator end--
Input Data iterator:[H2update 623.0 2020-05-08 15:19:25.14, H2update 297.0 
2020-05-08 15:19:28.501, H2update 832.0 2020-05-08 15:19:29.415]
 data iterator end--
Input Data iterator:[H2insert 455.0 2020-05-08 15:19:23.066, H2insert 910.0 
2020-05-08 15:19:23.955, H2insert 614.0 2020-05-08 15:19:24.397, H2insert 556.0 
2020-05-08 15:19:27.389, H2insert 922.0 2020-05-08 15:19:27.761, H2insert 165.0 
2020-05-08 15:19:28.26]
 data iterator end--

I can ensure that there is no duplication of kafka data, Could you help me 
point out where the problem is, thanks a lot



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17574) Deduplicate the process execution logic in Flink

2020-05-08 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-17574:
--

 Summary: Deduplicate the process execution logic in Flink
 Key: FLINK-17574
 URL: https://issues.apache.org/jira/browse/FLINK-17574
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Yangze Guo


Currently, there is a lot of process execution logic in both the production and 
testing components of Flink. It would be good to provide something like 
{{ProcessExecutionUtils}} in {{flink-core}} to achieve code deduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17575) KafkaException: Received exception when fetching the next record from XXX-14. If needed, please seek past the record to continue consumption.

2020-05-08 Thread Maxim Malykhin (Jira)
Maxim Malykhin created FLINK-17575:
--

 Summary: KafkaException: Received exception when fetching the next 
record from XXX-14. If needed, please seek past the record to continue 
consumption.
 Key: FLINK-17575
 URL: https://issues.apache.org/jira/browse/FLINK-17575
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: Maxim Malykhin


Stream don't catch exception for bad packet in kafka, exception terminate 
thread.
{code:java}
org.apache.kafka.common.KafkaException: Received exception when fetching the 
next record from prod_netscout_ug_s1mme-14. If needed, please seek past the 
record to continue consumption.org.apache.kafka.common.KafkaException: Received 
exception when fetching the next record from prod_netscout_ug_s1mme-14. If 
needed, please seek past the record to continue consumption. at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1522)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)Caused
 by: org.apache.kafka.common.KafkaException: Record batch for partition 
prod_netscout_ug_s1mme-14 at offset 22405513471 is invalid, cause: Record is 
corrupt (stored crc = 995400728, computed crc = 4153305836) at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1435)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1479)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
 ... 7 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17576) HiveTableSinkTest and TableEnvHiveConnectorTest are instable

2020-05-08 Thread Dian Fu (Jira)
Dian Fu created FLINK-17576:
---

 Summary: HiveTableSinkTest and TableEnvHiveConnectorTest are 
instable
 Key: FLINK-17576
 URL: https://issues.apache.org/jira/browse/FLINK-17576
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Tests
Affects Versions: 1.11.0
Reporter: Dian Fu


HiveTableSinkTest and TableEnvHiveConnectorTest failed with the following 
exception:
{code:java}
2020-05-08T09:38:44.5916441Z [ERROR] 
testWriteComplexType(org.apache.flink.connectors.hive.HiveTableSinkTest)  Time 
elapsed: 1.362 s  <<< ERROR!
2020-05-08T09:38:44.5932270Z java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (e27d50c5a780264a576aa8a21a6dd6c6)
2020-05-08T09:38:44.5938598Zat 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
2020-05-08T09:38:44.5939435Zat 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
2020-05-08T09:38:44.5939970Zat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1663)
2020-05-08T09:38:44.5940551Zat 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
2020-05-08T09:38:44.5941188Zat 
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
2020-05-08T09:38:44.5941834Zat 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:916)
2020-05-08T09:38:44.5945405Zat 
org.apache.flink.connectors.hive.HiveTableSinkTest.testWriteComplexType(HiveTableSinkTest.java:143)
2020-05-08T09:38:44.5946105Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-05-08T09:38:44.5946628Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-05-08T09:38:44.5947106Zat 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-05-08T09:38:44.5947770Zat 
java.lang.reflect.Method.invoke(Method.java:498)
2020-05-08T09:38:44.5948393Zat 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
2020-05-08T09:38:44.5949102Zat 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2020-05-08T09:38:44.5949853Zat 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
2020-05-08T09:38:44.5950587Zat 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2020-05-08T09:38:44.5951763Zat 
org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runTestMethod(FlinkStandaloneHiveRunner.java:169)
2020-05-08T09:38:44.5952660Zat 
org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:154)
2020-05-08T09:38:44.5953829Zat 
org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:92)
2020-05-08T09:38:44.5966233Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2020-05-08T09:38:44.5967051Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2020-05-08T09:38:44.5968062Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2020-05-08T09:38:44.5968949Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2020-05-08T09:38:44.5969824Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2020-05-08T09:38:44.5970751Zat 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2020-05-08T09:38:44.5971584Zat 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2020-05-08T09:38:44.5972386Zat 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
2020-05-08T09:38:44.5973469Zat 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
2020-05-08T09:38:44.5974147Zat 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
2020-05-08T09:38:44.5974684Zat 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2020-05-08T09:38:44.5975227Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2020-05-08T09:38:44.5975827Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
2020-05-08T09:38:44.5976533Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
2020-05-08T09:38:44.5977627Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
2020-05-08T09:38:44.5978552Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
2020-05-08T09:38:44.5979527Zat 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
2020-05-08T09:38:44.5980556Zat 
org.apache.maven.surefire.booter.ForkedBoot

[jira] [Created] (FLINK-17577) SinkFormat#createSinkFormat should use DynamicTableSink.Context as the first parameter

2020-05-08 Thread Danny Chen (Jira)
Danny Chen created FLINK-17577:
--

 Summary: SinkFormat#createSinkFormat should use 
DynamicTableSink.Context as the first parameter
 Key: FLINK-17577
 URL: https://issues.apache.org/jira/browse/FLINK-17577
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: Danny Chen
 Fix For: 1.11.0


This interface was introduced in FLINK-16997, it uses the 
ScanTableSource.Context as the first param, which is not correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17578) Union of 2 SideOutputs behaviour incorrect

2020-05-08 Thread Tom Wells (Jira)
Tom Wells created FLINK-17578:
-

 Summary: Union of 2 SideOutputs behaviour incorrect
 Key: FLINK-17578
 URL: https://issues.apache.org/jira/browse/FLINK-17578
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Tom Wells


Strange behaviour when using union() to merge outputs of 2 DataStreams, where 
both are sourced from SideOutputs.

See example code with comments demonstrating the issue:
{code:java}
// code placeholder
  def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements(1, 2, 3, 4)

val oddTag = OutputTag[Int]("odds")
val evenTag = OutputTag[Int]("even")

val all =
  input.process {
(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: 
Collector[Int]) => {
  if (value % 2 != 0)
ctx.output(oddTag, value)
  else
ctx.output(evenTag, value)
}
  }

val odds = all.getSideOutput(oddTag)
val evens = all.getSideOutput(evenTag)

// These print correctly
//
odds.print  // -> 1, 3
evens.print // -> 2, 4

// This prints incorrectly - BUG?
//
odds.union(evens).print   // -> 2, 2, 4, 4
evens.union(odds).print   // -> 1, 1, 3, 3

// Another test to understand normal behaviour of .union, using normal 
inputs
//
val odds1 = env.fromElements(1, 3)
val evens1 = env.fromElements(2, 4)

// Union of 2 normal inputs
//
odds1.union(evens1).print   // -> 1, 2, 3, 4

// Union of a normal input plus an input from a sideoutput
//
odds.union(evens1).print// -> 1, 2, 3, 4
evens1.union(odds).print// -> 1, 2, 3, 4

//
// So it seems that when both inputs are from sideoutputs that it behaves 
incorrectly... BUG?

env.execute("Test job")
  }



{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: What is the RocksDB local directory in flink checkpointing?

2020-05-08 Thread Yun Tang
Hi LakeShen

You could refer to [1] and [2] to know the temporary directory in YARN, the 
related log could be
"Setting directories for temporary files to: " or "Overriding Fink's temporary 
file directories with those specified in the Flink config: "


[1] 
https://github.com/apache/flink/blob/0dda6fe9dff4f667b110cda39bfe9738ba615b24/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java#L103
[2] 
https://github.com/apache/flink/blob/0dda6fe9dff4f667b110cda39bfe9738ba615b24/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java#L478-L489

Best
Yun Tang

From: Till Rohrmann 
Sent: Wednesday, May 6, 2020 17:35
To: LakeShen 
Cc: dev ; user ; user-zh 

Subject: Re: What is the RocksDB local directory in flink checkpointing?

Hi LakeShen,

`state.backend.rocksdb.localdir` defines the directory in which RocksDB
will store its local files. Local files are RocksDB's SST and metadata
files for example. This directory does not need to be persisted. If the
config option is not configured, then it will use the nodes temporary file
directory.

Cheers,
Till

On Wed, May 6, 2020 at 6:07 AM LakeShen  wrote:

> Hi community,
>
> Now I have a question about flink checkpoint local directory , our flink
> version is 1.6, job mode is
>
> flink on yarn per job . I saw the flink source code , and I find the flink
> checkpoint local directory is
>
> /tmp when you didn't config the "state.backend.rocksdb.localdir". But I go
> into the /tmp dir ,I
>
> couldn't find the flink checkpoint state local directory.
>
> What is the RocksDB local directory in flink checkpointing?  I am looking
> forward to your reply.
>
> Best,
> LakeShen
>


[jira] [Created] (FLINK-17579) Set the resource id of taskexecutor from environment variable if possible in standalone mode

2020-05-08 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-17579:
--

 Summary: Set the resource id of taskexecutor from environment 
variable if possible in standalone mode
 Key: FLINK-17579
 URL: https://issues.apache.org/jira/browse/FLINK-17579
 Project: Flink
  Issue Type: Sub-task
Reporter: Yangze Guo






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Kurt Young
+dev 

Best,
Kurt


On Fri, May 8, 2020 at 3:35 PM Caizhi Weng  wrote:

> Hi Jeff,
>
> Thanks for the response. However I'm using executeAsync so that I can run
> the job asynchronously and get a JobClient to monitor the job. JobListener
> only works for synchronous execute method. Is there other way to achieve
> this?
>
> Jeff Zhang  于2020年5月8日周五 下午3:29写道:
>
>> I use JobListener#onJobExecuted to be notified that the flink job is
>> done.
>> It is pretty reliable for me, the only exception is the client process is
>> down.
>>
>> BTW, the reason you see ApplicationNotFound exception is that yarn app
>> is terminated which means the flink cluster is shutdown. While for
>> standalone mode, the flink cluster is always up.
>>
>>
>> Caizhi Weng  于2020年5月8日周五 下午2:47写道:
>>
>>> Hi dear Flink community,
>>>
>>> I would like to determine whether a job has finished (no matter
>>> successfully or exceptionally) in my code.
>>>
>>> I used to think that JobClient#getJobStatus is a good idea, but I found
>>> that it behaves quite differently under different executing environments.
>>> For example, under a standalone session cluster it will return the FINISHED
>>> status for a finished job, while under a yarn per job cluster it will throw
>>> a ApplicationNotFound exception. I'm afraid that there might be other
>>> behaviors for other environments.
>>>
>>> So what's the best practice to determine whether a job has finished or
>>> not? Note that I'm not waiting for the job to finish. If the job hasn't
>>> finished I would like to know it and do something else.
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


[jira] [Created] (FLINK-17580) NPE in unaligned checkpoint for EndOfPartition events

2020-05-08 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-17580:
---

 Summary: NPE in unaligned checkpoint for EndOfPartition events
 Key: FLINK-17580
 URL: https://issues.apache.org/jira/browse/FLINK-17580
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Arvid Heise
Assignee: Arvid Heise
 Fix For: 1.11.0


Current master does not account for a 
{{StreamTaskNetworkInput#recordDeserializers }}being nulled after EOP 
events{{.}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17581) Update translation of S3 documentation

2020-05-08 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-17581:
--

 Summary: Update translation of S3 documentation
 Key: FLINK-17581
 URL: https://issues.apache.org/jira/browse/FLINK-17581
 Project: Flink
  Issue Type: Task
  Components: chinese-translation, Documentation
Reporter: Robert Metzger


The change in 
https://github.com/apache/flink/commit/7c5ac3584e42a0e7ebc5e78c532887bf4d383d9d 
needs to be added to the Chinese variant of the documentation page.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.10.1, release candidate #3

2020-05-08 Thread Robert Metzger
Thanks a lot for creating another RC!

+1 (binding)

- checked diff to last RC:
https://github.com/apache/flink/compare/release-1.10.1-rc2...release-1.10.1-rc3
  - kinesis dependency change is properly documented
- started Flink locally (on Java11 :) )
   - seems to be build off the specified commit
   - ran example
   - checked logs
- staging repo looks ok



On Thu, May 7, 2020 at 2:21 PM Yu Li  wrote:

> Hi everyone,
>
> Please review and vote on the release candidate #3 for version 1.10.1, as
> follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.10.1-rc3" [5],
> * website pull request listing the new release and adding announcement blog
> post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Yu
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc3/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1364/
> [5]
>
> https://github.com/apache/flink/commit/c5915cf87f96e1c7ebd84ad00f7eabade7e7fe37
> [6] https://github.com/apache/flink-web/pull/330
>


[jira] [Created] (FLINK-17582) Update quickstarts to use universal Kafka connector

2020-05-08 Thread Seth Wiesman (Jira)
Seth Wiesman created FLINK-17582:


 Summary: Update quickstarts to use universal Kafka connector
 Key: FLINK-17582
 URL: https://issues.apache.org/jira/browse/FLINK-17582
 Project: Flink
  Issue Type: Improvement
  Components: Quickstarts
Reporter: Seth Wiesman
Assignee: Seth Wiesman






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17583) Allow option to store a savepoint's _metadata file separate from its data files

2020-05-08 Thread Steve Bairos (Jira)
Steve Bairos created FLINK-17583:


 Summary: Allow option to store a savepoint's _metadata file 
separate from its data files
 Key: FLINK-17583
 URL: https://issues.apache.org/jira/browse/FLINK-17583
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.9.1
Reporter: Steve Bairos


(In the description I mainly talk about savepoints, but the plan )

We have a deployment framework that often needs to be able to return a list of 
valid savepoints in S3 with a certain prefix. Our assertion is that if an S3 
object ends with '_metadata', then it is a valid savepoint. In order to 
generate the list of valid savepoints, we need to locate all of the _metadata 
files that start with a given prefix.

For example, if our S3 bucket's paths look like this:

 
{code:java}
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/_metadata
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c165546-c326-43c0-9f47-f9a2cfd000ed
... thousands of other savepoint data files
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c757e5b-92b7-47b8-bfe8-cfe70eb28702
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/_metadata
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/41297fd5-40df-4683-bfb6-534bfddae92a
... thousands of other savepoint data files
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/acbe839a-1ec7-4b41-9d87-595d557c2ac6
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/_metadata
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/2d2f5551-56a7-4fea-b25b-b0156660c650
 thousands of other savepoint data files
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/c8c410df-5fb0-46a0-84c5-43e1575e8dc5
... dozens of other savepoint dirs


{code}
 

In order to get a list of all savepoints that my-job1 could possibly start 
with, we would want to get all the savepoints that start with the prefix:
{code:java}
s3://bucket/savepoints/my-job1 {code}
Ideally, we would want to have the ability to get a list like this from S3:
{code:java}
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/_metadata
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/_metadata
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/_metadata{code}
Unfortunately there is no easy way to get this value because S3's API only 
allows you to search based on prefix and not postfix. Listing all objects with 
the prefix 's3://bucket/savepoints/my-job1' and then filtering the list to only 
include the files that contain _metadata will also not work because there are 
thousands of savepoint data files that have the same prefix such as:
{code:java}
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c165546-c326-43c0-9f47-f9a2cfd000ed
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c757e5b-92b7-47b8-bfe8-cfe70eb28702
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/acbe839a-1ec7-4b41-9d87-595d557c2ac6
etc.{code}
 

I propose that we add a configuration in a similar vein to the S3 entropy 
injector which allows us to store the _metadata file in a separate path from 
the savepoint's data files. For example, with this hypothetical configuration:
{code:java}
state.checkpoints.split.key: _datasplit_
state.checkpoints.split.metadata.dir: metadata
state.checkpoints.split.data.dir: data{code}
When a user triggers a savepoint with the path
{code:java}
s3://bucket/savepoints/_datasplit_/my-job1/2020-05-07/ {code}
The resulting savepoint that is created looks like:
{code:java}
s3://bucket/savepoints/metadata/my-job1/2020-05-07/savepoint-654321-abcdef9876/_metadata
s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/a50fc483-3581-4b55-a37e-b7c61b3ee47f
s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/b0c6b7c0-6b94-43ae-8678-2f7640af1523
s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/c1855b35-c0b7-4347-9352-88423998e5ec{code}
Notice that the metadata's prefix is 
{code:java}
 s3://bucket/savepoints/metadata/my-job1/2020-05-07/{code}
and the data files' prefix is
{code:java}
 s3://bucket/savepoints/data/my-job1/2020-05-07/{code}
That way if I want to list all the savepoints for my-job1, I can just list all 
the objects with the prefix 
{code:java}
 aws s3 ls --recursive s3://bucket/savepoints/metadata/my-job1/{code}
And I can get a clean list of just the _metadata files easily.

 

One alternative that we've thought about is using is the entropy injection. It 
technically does separate the _metadata file from the rest of the data as well 
but it kind of makes a mess of entropy dirs in S3 so it's not our ideal choice. 

 

I'm happy to take a shot at i

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-05-08 Thread Fabian Hueske
I think we need the TEMPORAL TABLE syntax because they are conceptually
more than just regular tables.
In a addition to being a table that always holds the latest values (and can
thereby serve as input to a continuous query), the system also needs to
track the history of such a table to be able to serve different versions of
the table (as requested by FOR SYSTEM_TIME AS OF).

Of course we could define that Flink implicitly tracks the (recent, i.e.,
within watermark bounds) history of all dynamic tables.
However, there's one more thing the system needs to know to be able to
correctly evaluate FOR SYSTEM_TIME AS OF x, namely which time attribute to
use as version of the temporal table.
IMO it would be good to make this explicit, especially if there is a plan
to eventually support support multiple event-time attributes / watermarks
on a table.
Just using the only event time attribute would be a bit too much convention
magic for my taste (others might of course have a different opinion on this
subject).

So I agree with Kurt that we don't necessarily need the TEMPORAL TABLE
statement if we agree on a few implicit conventions (implicit history table
+ implicit versioning attribute).
I'm not a big fan of such conventions and think it's better to make such
things explicit.

For temporal joins with processing time semantics, we can use regular
dynamic tables without declaring them as TEMPORAL since we don't need a
history table to derive the current version.
AFAIK, these are already the semantics we use for LookupTableSource.

Regarding the question of append-only tables and temporal tables, I'd like
to share some more thoughts.
As I said above, a temporal table consists of a regular dynamic table A
that holds the latest version and a table H that holds the history of A.
1) When defining a temporal table based on a regular dynamic table (with a
primary key), we provide A and the Flink automatically maintains H (bounded
by watermarks)
2) When defining a temporal table based on an append-only table, Flink
ingests H and we use the temporal table function to turn it into a dynamic
table with a primary key, i.e., into A. This conversion could also be done
during ingestion by treating the append-only stream as an upsert changelog
and converting it into a dynamic table with PK and as Table A (just in case
1).

As Jark said "converting append-only table into changelog table" was moved
to future work.
Until then, we could only define TEMPORAL TABLE on a table that is derived
from a proper changelog stream with a specific encoding.
The TEMPORAL VIEW would be a shortcut which would allow us to perform the
conversion in Flink SQL (and not within the connector) and defining the
temporal properties on the result of the view.

Cheers,
Fabian



Am Fr., 8. Mai 2020 um 08:29 Uhr schrieb Kurt Young :

> I might missed something but why we need a new "TEMPORAL TABLE" syntax?
>
> According to Fabian's first mail:
>
> > Hence, the requirements for a temporal table are:
> > * The temporal table has a primary key / unique attribute
> > * The temporal table has a time-attribute that defines the start of the
> > validity interval of a row (processing time or event time)
> > * The system knows that the history of the table is tracked and can infer
> > how to look up a version.
>
> I think primary key plus proper event time attribute is already
> sufficient. So a join query looks like:
>
> "Fact join Dim FOR SYSTEM_TIME AS OF Fact.some_event_time ON Fact.id =
> Dim.id"
>
> would means for every record belong to Fact, use Fact.some_event_time as
> Dim's version (which
> will only keep all records from Dim table with event time less or equal
> to Fact.some_event_time, and
> keep only one record for each primary key).
>
> The temporal behavior is actually triggered by the join syntax "FOR
> SYSTEM_TIME AS OF Fact.some_event_time"
> but not the DDL description.
>
> Best,
> Kurt
>
>
> On Fri, May 8, 2020 at 10:51 AM Jark Wu  wrote:
>
>> Hi,
>>
>> I agree what Fabian said above.
>> Besides, IMO, (3) is in a lower priority and will involve much more
>> things.
>> It makes sense to me to do it in two-phase.
>>
>> Regarding to (3), the key point to convert an append-only table into
>> changelog table is that the framework should know the operation type,
>> so we introduced a special CREATE VIEW syntax to do it in the
>> documentation
>> [1]. Here is an example:
>>
>> -- my_binlog table is registered as an append-only table
>> CREATE TABLE my_binlog (
>>   before ROW<...>,
>>   after ROW<...>,
>>   op STRING,
>>   op_ms TIMESTAMP(3)
>> ) WITH (
>>   'connector.type' = 'kafka',
>>   ...
>> );
>>
>> -- interpret my_binlog as a changelog on the op_type and id key
>> CREATE VIEW my_table AS
>>   SELECT
>> after.*
>>   FROM my_binlog
>>   CHANGELOG OPERATION BY op
>>   UPDATE KEY BY (id);
>>
>> -- my_table will materialize the insert/delete/update changes
>> -- if we have 4 records in dbz that
>> -- a create for 1004
>> -- an update for 1004
>> -- a c

[jira] [Created] (FLINK-17584) disableAutoTypeRegistration option does not work with Streaming API, only with Batch

2020-05-08 Thread Yaron Shani (Jira)
Yaron Shani created FLINK-17584:
---

 Summary: disableAutoTypeRegistration option does not work with 
Streaming API, only with Batch
 Key: FLINK-17584
 URL: https://issues.apache.org/jira/browse/FLINK-17584
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Yaron Shani


Hey,

There is a feature called disableAutoTypeRegistration which is, from my 
understanding, should disable the auto-loading classes into Kryo. It seems to 
work on the Batch API, but I don't see any reference into the DataStream code, 
and it does not work there. Is it by design? If so, I think its better if it 
would state it clearly. If not, can I suggest a fix? Something like this:

 
{code:java}
@Override
@PublicEvolving
public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.hasGenericTypesDisabled()) {
  throw new UnsupportedOperationException(
 "Generic types have been disabled in the ExecutionConfig and type " + 
this.typeClass.getName() +
 " is treated as a generic type.");
   }
   if(config.isAutoTypeRegistrationDisabled()) {
  if(!config.getRegisteredKryoTypes().contains(this.typeClass)) {
 throw new UnsupportedOperationException(
"Auto type registration (disableAutoTypeRegistration) have been 
enabled in the ExecutionConfig and type " + this.typeClass.getName() +
   " is treated as a auto type.");
  }
   }

   return new KryoSerializer(this.typeClass, config);
}
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-05-08 Thread Kurt Young
All tables being described by Flink's DDL are dynamic tables. But dynamic
table is more like a logical concept, but not physical things.
Physically, dynamic table has two different forms, one is a materialized
table which changes over time (e.g. Database table, HBase table),
another form is stream which represents change logs, and they are typically
stored in message queue (e.g, Kafka). For the later one, I think
the records already representing the history of the dynamic table based on
stream-table duality.

So regarding to:
> Of course we could define that Flink implicitly tracks the (recent, i.e.,
within watermark bounds) history of all dynamic tables.
I don't think this is Flink implicitly tracking the history of the dynamic
table, but the physical data of the table is already the history itself.
What Flink
did is read the history out, and organize them to be prepared for further
operations.

I agree with another implicit convention I took though, which treats the
event time as the version of the dynamic table. Strictly speaking,
we should use another syntax "PERIOD FOR SYSTEM_TIME" [1] to indicate the
version of the table. I've been thinking about this for quite a bit,
it turns out that this semantic is too similar with Flink's event time. It
will cause more trouble for users to understand what does this mean if
we treat event time and this "PERIOD FOR SYSTEM_TIME" differently. And I'm
also afraid that we will introduce lots of bugs because not all
the developers will understand this easily.
[1]
https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15

Best,
Kurt


On Sat, May 9, 2020 at 5:32 AM Fabian Hueske  wrote:

> I think we need the TEMPORAL TABLE syntax because they are conceptually
> more than just regular tables.
> In a addition to being a table that always holds the latest values (and
> can thereby serve as input to a continuous query), the system also needs to
> track the history of such a table to be able to serve different versions of
> the table (as requested by FOR SYSTEM_TIME AS OF).
>
> Of course we could define that Flink implicitly tracks the (recent, i.e.,
> within watermark bounds) history of all dynamic tables.
> However, there's one more thing the system needs to know to be able to
> correctly evaluate FOR SYSTEM_TIME AS OF x, namely which time attribute to
> use as version of the temporal table.
> IMO it would be good to make this explicit, especially if there is a plan
> to eventually support support multiple event-time attributes / watermarks
> on a table.
> Just using the only event time attribute would be a bit too much
> convention magic for my taste (others might of course have a different
> opinion on this subject).
>
> So I agree with Kurt that we don't necessarily need the TEMPORAL TABLE
> statement if we agree on a few implicit conventions (implicit history table
> + implicit versioning attribute).
> I'm not a big fan of such conventions and think it's better to make such
> things explicit.
>
> For temporal joins with processing time semantics, we can use regular
> dynamic tables without declaring them as TEMPORAL since we don't need a
> history table to derive the current version.
> AFAIK, these are already the semantics we use for LookupTableSource.
>
> Regarding the question of append-only tables and temporal tables, I'd like
> to share some more thoughts.
> As I said above, a temporal table consists of a regular dynamic table A
> that holds the latest version and a table H that holds the history of A.
> 1) When defining a temporal table based on a regular dynamic table (with a
> primary key), we provide A and the Flink automatically maintains H (bounded
> by watermarks)
> 2) When defining a temporal table based on an append-only table, Flink
> ingests H and we use the temporal table function to turn it into a dynamic
> table with a primary key, i.e., into A. This conversion could also be done
> during ingestion by treating the append-only stream as an upsert changelog
> and converting it into a dynamic table with PK and as Table A (just in case
> 1).
>
> As Jark said "converting append-only table into changelog table" was moved
> to future work.
> Until then, we could only define TEMPORAL TABLE on a table that is derived
> from a proper changelog stream with a specific encoding.
> The TEMPORAL VIEW would be a shortcut which would allow us to perform the
> conversion in Flink SQL (and not within the connector) and defining the
> temporal properties on the result of the view.
>
> Cheers,
> Fabian
>
>
>
> Am Fr., 8. Mai 2020 um 08:29 Uhr schrieb Kurt Young :
>
>> I might missed something but why we need a new "TEMPORAL TABLE" syntax?
>>
>> According to Fabian's first mail:
>>
>> > Hence, the requirements for a temporal table are:
>> > * The temporal table has a primary key / unique attribute
>> > * The temporal table has a time-attribute that defines the start of the
>> > validity interval of a row (processing time or ev

Would you please give me the permission as a contributor

2020-05-08 Thread ??????????
Hi Guys, I want to contribute to Apache Flink. Would you please give me the 
permission as a contributor? My JIRA ID is limaowei66.

Re: Would you please give me the permission as a contributor

2020-05-08 Thread Yangze Guo
Hi Maowei,

Welcome to the community! Currently, there are no special permissions
required to contribute to Flink. Just ping a committer if you want to
work on some JIRA ticket and someone will assign the ticket to you.

Here is some information about how to contribute to Flink [1].

[1] https://flink.apache.org/contributing/contribute-code.html

Best,
Yangze Guo

On Sat, May 9, 2020 at 10:16 AM 天地任我游 <770236...@qq.com> wrote:
>
> Hi Guys, I want to contribute to Apache Flink. Would you please give me the 
> permission as a contributor? My JIRA ID is limaowei66.


[jira] [Created] (FLINK-17585) "PythonProgramOptions" changes the entry point class when user submit a Java sql job which contains Python UDF

2020-05-08 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-17585:
-

 Summary: "PythonProgramOptions" changes the entry point class when 
user submit a Java sql job which contains Python UDF 
 Key: FLINK-17585
 URL: https://issues.apache.org/jira/browse/FLINK-17585
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Wei Zhong


If we running such a command:
{code:java}
flink run -pyfs xxx.py xxx.jar
{code}
The main class will be changed to "PythonDriver". It is because the 
"PythonProgramOptions" class changes the entry point class when the python 
command line options are detected. We should consider the situation that user 
submit a Java sql job which contains Python UDF. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17586) Due to the new behavior of the table planner the Python dependency management command line options does not work

2020-05-08 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-17586:
-

 Summary: Due to the new behavior of the table planner the Python 
dependency management command line options does not work 
 Key: FLINK-17586
 URL: https://issues.apache.org/jira/browse/FLINK-17586
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Wei Zhong






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17587) Filesystem streaming sink support partition commit (success file)

2020-05-08 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-17587:


 Summary: Filesystem streaming sink support partition commit 
(success file)
 Key: FLINK-17587
 URL: https://issues.apache.org/jira/browse/FLINK-17587
 Project: Flink
  Issue Type: Sub-task
Reporter: Jingsong Lee






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17588) Throw exception when hadoop jar is missing in flink scala's yarn mode.

2020-05-08 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-17588:
--

 Summary: Throw exception when hadoop jar is missing in flink 
scala's yarn mode.
 Key: FLINK-17588
 URL: https://issues.apache.org/jira/browse/FLINK-17588
 Project: Flink
  Issue Type: Improvement
  Components: Scala Shell
Affects Versions: 1.10.0
Reporter: Jeff Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.10.1, release candidate #3

2020-05-08 Thread Thomas Weise
Thanks for the RC!

+1 (binding)

- repeated benchmark runs


On Fri, May 8, 2020 at 10:52 AM Robert Metzger  wrote:

> Thanks a lot for creating another RC!
>
> +1 (binding)
>
> - checked diff to last RC:
>
> https://github.com/apache/flink/compare/release-1.10.1-rc2...release-1.10.1-rc3
>   - kinesis dependency change is properly documented
> - started Flink locally (on Java11 :) )
>- seems to be build off the specified commit
>- ran example
>- checked logs
> - staging repo looks ok
>
>
>
> On Thu, May 7, 2020 at 2:21 PM Yu Li  wrote:
>
> > Hi everyone,
> >
> > Please review and vote on the release candidate #3 for version 1.10.1, as
> > follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> > The complete staging area is available for your review, which includes:
> > * JIRA release notes [1],
> > * the official Apache source release and binary convenience releases to
> be
> > deployed to dist.apache.org [2], which are signed with the key with
> > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag "release-1.10.1-rc3" [5],
> > * website pull request listing the new release and adding announcement
> blog
> > post [6].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Yu
> >
> > [1]
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
> > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc3/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1364/
> > [5]
> >
> >
> https://github.com/apache/flink/commit/c5915cf87f96e1c7ebd84ad00f7eabade7e7fe37
> > [6] https://github.com/apache/flink-web/pull/330
> >
>


[jira] [Created] (FLINK-17589) Extend StreamingFileSink to Support Streaming Hive Connector

2020-05-08 Thread Yun Gao (Jira)
Yun Gao created FLINK-17589:
---

 Summary: Extend StreamingFileSink to Support Streaming Hive 
Connector
 Key: FLINK-17589
 URL: https://issues.apache.org/jira/browse/FLINK-17589
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / FileSystem
Reporter: Yun Gao
 Fix For: 1.11.0


According to the [Discussion on 
FLIP-115|[https://lists.apache.org/thread.html/rb1795428e481dbeaa1af2dcffed87b4804ed81d2af60256cd50032d7%40%3Cdev.flink.apache.org%3E]],
 we will use StreamingFileSink to support the streaming hive connector. This 
requires to make some extension to the current StreamingFileSink:
 #   Support path-based writer to write the specified Hadoop path.
 #   Provides listeners to collect the bucket state so that the Hive sink could 
decide when one bucket is terminated and then writing meta-info to Hive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17590) Add Bucket lifecycle listener to support acquiring bucket state

2020-05-08 Thread Yun Gao (Jira)
Yun Gao created FLINK-17590:
---

 Summary: Add Bucket lifecycle listener to support acquiring bucket 
state
 Key: FLINK-17590
 URL: https://issues.apache.org/jira/browse/FLINK-17590
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / FileSystem
Reporter: Yun Gao
 Fix For: 1.11.0


Hive sink will reuse the Buckets class of StreamingFileSink, which encapsulate 
most of the logic of StreamingFileSink. Hive sink requires to writing one-piece 
of meta-info into Hive meta store after a partition (namely Bucket in 
StreamingFileSink) has been terminated. Currently the termination is judged by 
event-time/processing time 
([FLIP-115|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table]]).

 

To support the requirement of the Hive Sink, we would add listener for 
acquiring the event bucket creation and getting inactive. A bucket get inactive 
if all the previous records have been committed. Then Hive Sink could safely 
writing meta-info if the time has exceeded the bucket's boundary and it has 
been inactive. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17591) TableEnvironmentITCase.testExecuteSqlAndToDataStream failed

2020-05-08 Thread Jark Wu (Jira)
Jark Wu created FLINK-17591:
---

 Summary: TableEnvironmentITCase.testExecuteSqlAndToDataStream 
failed
 Key: FLINK-17591
 URL: https://issues.apache.org/jira/browse/FLINK-17591
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner
Reporter: Jark Wu


Here is the instance: 
https://dev.azure.com/imjark/Flink/_build/results?buildId=61&view=logs&j=69332ead-8935-5abf-5b3d-e4280fb1ff4c&t=6855dd6e-a7b0-5fd1-158e-29fc186b16c8


{code:java}

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Failures: 
[ERROR]   TableEnvironmentITCase.testExecuteSqlAndToDataStream:343
[INFO] 
[ERROR] Tests run: 791, Failures: 1, Errors: 0, Skipped: 13

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17592) flink-table-planner doesn't compile on Scala 2.12

2020-05-08 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-17592:
--

 Summary: flink-table-planner doesn't compile on Scala 2.12
 Key: FLINK-17592
 URL: https://issues.apache.org/jira/browse/FLINK-17592
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: Robert Metzger


CI: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=844&view=logs&j=ed6509f5-1153-558c-557a-5ee0afbcdf24&t=241b1e5e-1a8e-5e6a-469a-a9b8cad87065

{code}
[WARNING]  ^
[ERROR] 
/__w/1/s/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSinks.scala:73:
 error: overriding method getTableSchema in trait TableSink of type 
()org.apache.flink.table.api.TableSchema;
[ERROR]  method getTableSchema needs `override' modifier
[ERROR]   def getTableSchema: TableSchema = {
[ERROR]   ^
[WARNING] 8 warnings found
[ERROR] one error found
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)