[jira] [Created] (FLINK-17567) Create a dedicated Python directory in release directory to place Python-related source and binary packages
Huang Xingbo created FLINK-17567: Summary: Create a dedicated Python directory in release directory to place Python-related source and binary packages Key: FLINK-17567 URL: https://issues.apache.org/jira/browse/FLINK-17567 Project: Flink Issue Type: Improvement Components: API / Python, Release System Reporter: Huang Xingbo Fix For: 1.11.0 We introduced cross platform wheel packages in 1.11. It is confused that we put all wheel packages and the corresponding signature verification files in the root directory of the release. So we plan to put python-related files in a dedicated sub directory in the release directory. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17568) Task may consume data after checkpoint barrier before performing checkpoint for unaligned checkpoint
Yingjie Cao created FLINK-17568: --- Summary: Task may consume data after checkpoint barrier before performing checkpoint for unaligned checkpoint Key: FLINK-17568 URL: https://issues.apache.org/jira/browse/FLINK-17568 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Yingjie Cao Fix For: 1.11.0 For unaligned checkpoint, task may consume data after the checkpoint barrier before performing checkpoint which lead to consumption of duplicated data and corruption of data stream. More specifically, when the Netty thread notifies the checkpoint barrier for the first time and enqueue a checkpointing task in the mailbox, the task thread may still in data consumption loop and if it reads a new checkpoint barrier from another channel it will not return to the mailbox and instead it will continue to read data until a all data consumed or we have a full record, meanwhile, the data after checkpoint barrier may be read and consumed which lead to inconsistency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17569) to support ViewFileSystem when wait lease revoke of hadoop filesystem
konwu created FLINK-17569: - Summary: to support ViewFileSystem when wait lease revoke of hadoop filesystem Key: FLINK-17569 URL: https://issues.apache.org/jira/browse/FLINK-17569 Project: Flink Issue Type: Bug Components: FileSystems Reporter: konwu Currently waitUntilLeaseIsRevoked method is not support ViewFileSystem and it will case the same issue [fail to recover after taskmanager failure|https://issues.apache.org/jira/browse/FLINK-11419] I try to resolve the real path and FileSystem of ViewFileSystem and it works {code:java} if (fs instanceof ViewFileSystem) { ViewFileSystem vfs = (ViewFileSystem) fs; Path resolvePath = vfs.resolvePath(path); DistributedFileSystem dfs = (DistributedFileSystem) resolvePath.getFileSystem(fs.getConf()); return waitUntilLeaseIsRevoked(dfs, resolvePath); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17570) BatchTableEnvironment#fromValues(Object... values) causes StackOverflowError
Wei Zhong created FLINK-17570: - Summary: BatchTableEnvironment#fromValues(Object... values) causes StackOverflowError Key: FLINK-17570 URL: https://issues.apache.org/jira/browse/FLINK-17570 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Wei Zhong The Error can be reproduced by following code: {code:java} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); tEnv.fromValues(1L, 2L, 3L); {code} The Error is as following: {code:java} Exception in thread "main" java.lang.StackOverflowErrorException in thread "main" java.lang.StackOverflowError at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.ApiExpressionUtils.convertArray(ApiExpressionUtils.java:142) at org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression(ApiExpressionUtils.java:100) at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$2.apply(TableEnvImpl.scala:1030) at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$2.apply(TableEnvImpl.scala:1030) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1030) at org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163) at org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032) at org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163) at org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032) at org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163) at org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032) at org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163) at org.apache.flink.table.api.internal.TableEnvImpl.fromValues(TableEnvImpl.scala:1032) at org.apache.flink.table.api.TableEnvironment.fromValues(TableEnvironment.java:163) ...{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17571) A better way to show the files used in currently checkpoints
Congxian Qiu(klion26) created FLINK-17571: - Summary: A better way to show the files used in currently checkpoints Key: FLINK-17571 URL: https://issues.apache.org/jira/browse/FLINK-17571 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Inspired by the [user mail]([http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Shared-Checkpoint-Cleanup-and-S3-Lifecycle-Policy-tt34965.html]). Currently, there are three types of directory for a checkpoint, the files in TASKOWND and EXCLUSIVE directory can be deleted safely, but users can't delete the files in the SHARED directory safely(the files may be created a long time ago). I think it's better to give users a better way to know which files are currently used(so the others are not used) maybe a command-line command such as below is ok enough to support such a feature. {{./bin/flink checkpoint list $checkpointDir # list all the files used in checkpoint}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17572) Remove checkpoint alignment buffered metric from webui
Yingjie Cao created FLINK-17572: --- Summary: Remove checkpoint alignment buffered metric from webui Key: FLINK-17572 URL: https://issues.apache.org/jira/browse/FLINK-17572 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Affects Versions: 1.11.0 Reporter: Yingjie Cao After FLINK-16404, we never cache buffers while checkpoint barrier alignment, so the checkpoint alignment buffered metric will be always 0, we should remove it directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17573) There is duplicate source data in ProcessWindowFunction
Tammy zhang created FLINK-17573: --- Summary: There is duplicate source data in ProcessWindowFunction Key: FLINK-17573 URL: https://issues.apache.org/jira/browse/FLINK-17573 Project: Flink Issue Type: Bug Reporter: Tammy zhang i consumed kafka topic data, and keyby the stream, then use a ProcessWindowFunction in this keyedStream, and a strange phenomenon appeared, the process function's sourceData become duplicated, like: Input Data iterator:[H2update 623.0 2020-05-08 15:19:25.14, H2update 297.0 2020-05-08 15:19:28.501, H2update 832.0 2020-05-08 15:19:29.415] data iterator end-- Input Data iterator:[H1400 59.0 2020-05-08 15:19:07.087, H1400 83.0 2020-05-08 15:19:09.521] data iterator end-- Input Data iterator:[H2insert 455.0 2020-05-08 15:19:23.066, H2insert 910.0 2020-05-08 15:19:23.955, H2insert 614.0 2020-05-08 15:19:24.397, H2insert 556.0 2020-05-08 15:19:27.389, H2insert 922.0 2020-05-08 15:19:27.761, H2insert 165.0 2020-05-08 15:19:28.26] data iterator end-- Input Data iterator:[H1400 59.0 2020-05-08 15:19:07.087, H1400 83.0 2020-05-08 15:19:09.521] data iterator end-- Input Data iterator:[H2update 623.0 2020-05-08 15:19:25.14, H2update 297.0 2020-05-08 15:19:28.501, H2update 832.0 2020-05-08 15:19:29.415] data iterator end-- Input Data iterator:[H2insert 455.0 2020-05-08 15:19:23.066, H2insert 910.0 2020-05-08 15:19:23.955, H2insert 614.0 2020-05-08 15:19:24.397, H2insert 556.0 2020-05-08 15:19:27.389, H2insert 922.0 2020-05-08 15:19:27.761, H2insert 165.0 2020-05-08 15:19:28.26] data iterator end-- I can ensure that there is no duplication of kafka data, Could you help me point out where the problem is, thanks a lot -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17574) Deduplicate the process execution logic in Flink
Yangze Guo created FLINK-17574: -- Summary: Deduplicate the process execution logic in Flink Key: FLINK-17574 URL: https://issues.apache.org/jira/browse/FLINK-17574 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Yangze Guo Currently, there is a lot of process execution logic in both the production and testing components of Flink. It would be good to provide something like {{ProcessExecutionUtils}} in {{flink-core}} to achieve code deduplication. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17575) KafkaException: Received exception when fetching the next record from XXX-14. If needed, please seek past the record to continue consumption.
Maxim Malykhin created FLINK-17575: -- Summary: KafkaException: Received exception when fetching the next record from XXX-14. If needed, please seek past the record to continue consumption. Key: FLINK-17575 URL: https://issues.apache.org/jira/browse/FLINK-17575 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.0 Reporter: Maxim Malykhin Stream don't catch exception for bad packet in kafka, exception terminate thread. {code:java} org.apache.kafka.common.KafkaException: Received exception when fetching the next record from prod_netscout_ug_s1mme-14. If needed, please seek past the record to continue consumption.org.apache.kafka.common.KafkaException: Received exception when fetching the next record from prod_netscout_ug_s1mme-14. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1522) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)Caused by: org.apache.kafka.common.KafkaException: Record batch for partition prod_netscout_ug_s1mme-14 at offset 22405513471 is invalid, cause: Record is corrupt (stored crc = 995400728, computed crc = 4153305836) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1435) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1479) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536) ... 7 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17576) HiveTableSinkTest and TableEnvHiveConnectorTest are instable
Dian Fu created FLINK-17576: --- Summary: HiveTableSinkTest and TableEnvHiveConnectorTest are instable Key: FLINK-17576 URL: https://issues.apache.org/jira/browse/FLINK-17576 Project: Flink Issue Type: Bug Components: Connectors / Hive, Tests Affects Versions: 1.11.0 Reporter: Dian Fu HiveTableSinkTest and TableEnvHiveConnectorTest failed with the following exception: {code:java} 2020-05-08T09:38:44.5916441Z [ERROR] testWriteComplexType(org.apache.flink.connectors.hive.HiveTableSinkTest) Time elapsed: 1.362 s <<< ERROR! 2020-05-08T09:38:44.5932270Z java.util.concurrent.ExecutionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (e27d50c5a780264a576aa8a21a6dd6c6) 2020-05-08T09:38:44.5938598Zat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 2020-05-08T09:38:44.5939435Zat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 2020-05-08T09:38:44.5939970Zat org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1663) 2020-05-08T09:38:44.5940551Zat org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) 2020-05-08T09:38:44.5941188Zat org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) 2020-05-08T09:38:44.5941834Zat org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:916) 2020-05-08T09:38:44.5945405Zat org.apache.flink.connectors.hive.HiveTableSinkTest.testWriteComplexType(HiveTableSinkTest.java:143) 2020-05-08T09:38:44.5946105Zat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2020-05-08T09:38:44.5946628Zat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2020-05-08T09:38:44.5947106Zat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2020-05-08T09:38:44.5947770Zat java.lang.reflect.Method.invoke(Method.java:498) 2020-05-08T09:38:44.5948393Zat org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 2020-05-08T09:38:44.5949102Zat org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2020-05-08T09:38:44.5949853Zat org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 2020-05-08T09:38:44.5950587Zat org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2020-05-08T09:38:44.5951763Zat org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runTestMethod(FlinkStandaloneHiveRunner.java:169) 2020-05-08T09:38:44.5952660Zat org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:154) 2020-05-08T09:38:44.5953829Zat org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:92) 2020-05-08T09:38:44.5966233Zat org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 2020-05-08T09:38:44.5967051Zat org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 2020-05-08T09:38:44.5968062Zat org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 2020-05-08T09:38:44.5968949Zat org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 2020-05-08T09:38:44.5969824Zat org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 2020-05-08T09:38:44.5970751Zat org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 2020-05-08T09:38:44.5971584Zat org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 2020-05-08T09:38:44.5972386Zat org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 2020-05-08T09:38:44.5973469Zat org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 2020-05-08T09:38:44.5974147Zat org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 2020-05-08T09:38:44.5974684Zat org.junit.rules.RunRules.evaluate(RunRules.java:20) 2020-05-08T09:38:44.5975227Zat org.junit.runners.ParentRunner.run(ParentRunner.java:363) 2020-05-08T09:38:44.5975827Zat org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) 2020-05-08T09:38:44.5976533Zat org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) 2020-05-08T09:38:44.5977627Zat org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) 2020-05-08T09:38:44.5978552Zat org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) 2020-05-08T09:38:44.5979527Zat org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) 2020-05-08T09:38:44.5980556Zat org.apache.maven.surefire.booter.ForkedBoot
[jira] [Created] (FLINK-17577) SinkFormat#createSinkFormat should use DynamicTableSink.Context as the first parameter
Danny Chen created FLINK-17577: -- Summary: SinkFormat#createSinkFormat should use DynamicTableSink.Context as the first parameter Key: FLINK-17577 URL: https://issues.apache.org/jira/browse/FLINK-17577 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.0 Reporter: Danny Chen Fix For: 1.11.0 This interface was introduced in FLINK-16997, it uses the ScanTableSource.Context as the first param, which is not correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17578) Union of 2 SideOutputs behaviour incorrect
Tom Wells created FLINK-17578: - Summary: Union of 2 SideOutputs behaviour incorrect Key: FLINK-17578 URL: https://issues.apache.org/jira/browse/FLINK-17578 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: Tom Wells Strange behaviour when using union() to merge outputs of 2 DataStreams, where both are sourced from SideOutputs. See example code with comments demonstrating the issue: {code:java} // code placeholder def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val input = env.fromElements(1, 2, 3, 4) val oddTag = OutputTag[Int]("odds") val evenTag = OutputTag[Int]("even") val all = input.process { (value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]) => { if (value % 2 != 0) ctx.output(oddTag, value) else ctx.output(evenTag, value) } } val odds = all.getSideOutput(oddTag) val evens = all.getSideOutput(evenTag) // These print correctly // odds.print // -> 1, 3 evens.print // -> 2, 4 // This prints incorrectly - BUG? // odds.union(evens).print // -> 2, 2, 4, 4 evens.union(odds).print // -> 1, 1, 3, 3 // Another test to understand normal behaviour of .union, using normal inputs // val odds1 = env.fromElements(1, 3) val evens1 = env.fromElements(2, 4) // Union of 2 normal inputs // odds1.union(evens1).print // -> 1, 2, 3, 4 // Union of a normal input plus an input from a sideoutput // odds.union(evens1).print// -> 1, 2, 3, 4 evens1.union(odds).print// -> 1, 2, 3, 4 // // So it seems that when both inputs are from sideoutputs that it behaves incorrectly... BUG? env.execute("Test job") } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: What is the RocksDB local directory in flink checkpointing?
Hi LakeShen You could refer to [1] and [2] to know the temporary directory in YARN, the related log could be "Setting directories for temporary files to: " or "Overriding Fink's temporary file directories with those specified in the Flink config: " [1] https://github.com/apache/flink/blob/0dda6fe9dff4f667b110cda39bfe9738ba615b24/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java#L103 [2] https://github.com/apache/flink/blob/0dda6fe9dff4f667b110cda39bfe9738ba615b24/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java#L478-L489 Best Yun Tang From: Till Rohrmann Sent: Wednesday, May 6, 2020 17:35 To: LakeShen Cc: dev ; user ; user-zh Subject: Re: What is the RocksDB local directory in flink checkpointing? Hi LakeShen, `state.backend.rocksdb.localdir` defines the directory in which RocksDB will store its local files. Local files are RocksDB's SST and metadata files for example. This directory does not need to be persisted. If the config option is not configured, then it will use the nodes temporary file directory. Cheers, Till On Wed, May 6, 2020 at 6:07 AM LakeShen wrote: > Hi community, > > Now I have a question about flink checkpoint local directory , our flink > version is 1.6, job mode is > > flink on yarn per job . I saw the flink source code , and I find the flink > checkpoint local directory is > > /tmp when you didn't config the "state.backend.rocksdb.localdir". But I go > into the /tmp dir ,I > > couldn't find the flink checkpoint state local directory. > > What is the RocksDB local directory in flink checkpointing? I am looking > forward to your reply. > > Best, > LakeShen >
[jira] [Created] (FLINK-17579) Set the resource id of taskexecutor from environment variable if possible in standalone mode
Yangze Guo created FLINK-17579: -- Summary: Set the resource id of taskexecutor from environment variable if possible in standalone mode Key: FLINK-17579 URL: https://issues.apache.org/jira/browse/FLINK-17579 Project: Flink Issue Type: Sub-task Reporter: Yangze Guo -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: What's the best practice to determine whether a job has finished or not?
+dev Best, Kurt On Fri, May 8, 2020 at 3:35 PM Caizhi Weng wrote: > Hi Jeff, > > Thanks for the response. However I'm using executeAsync so that I can run > the job asynchronously and get a JobClient to monitor the job. JobListener > only works for synchronous execute method. Is there other way to achieve > this? > > Jeff Zhang 于2020年5月8日周五 下午3:29写道: > >> I use JobListener#onJobExecuted to be notified that the flink job is >> done. >> It is pretty reliable for me, the only exception is the client process is >> down. >> >> BTW, the reason you see ApplicationNotFound exception is that yarn app >> is terminated which means the flink cluster is shutdown. While for >> standalone mode, the flink cluster is always up. >> >> >> Caizhi Weng 于2020年5月8日周五 下午2:47写道: >> >>> Hi dear Flink community, >>> >>> I would like to determine whether a job has finished (no matter >>> successfully or exceptionally) in my code. >>> >>> I used to think that JobClient#getJobStatus is a good idea, but I found >>> that it behaves quite differently under different executing environments. >>> For example, under a standalone session cluster it will return the FINISHED >>> status for a finished job, while under a yarn per job cluster it will throw >>> a ApplicationNotFound exception. I'm afraid that there might be other >>> behaviors for other environments. >>> >>> So what's the best practice to determine whether a job has finished or >>> not? Note that I'm not waiting for the job to finish. If the job hasn't >>> finished I would like to know it and do something else. >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> >
[jira] [Created] (FLINK-17580) NPE in unaligned checkpoint for EndOfPartition events
Arvid Heise created FLINK-17580: --- Summary: NPE in unaligned checkpoint for EndOfPartition events Key: FLINK-17580 URL: https://issues.apache.org/jira/browse/FLINK-17580 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Arvid Heise Assignee: Arvid Heise Fix For: 1.11.0 Current master does not account for a {{StreamTaskNetworkInput#recordDeserializers }}being nulled after EOP events{{.}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17581) Update translation of S3 documentation
Robert Metzger created FLINK-17581: -- Summary: Update translation of S3 documentation Key: FLINK-17581 URL: https://issues.apache.org/jira/browse/FLINK-17581 Project: Flink Issue Type: Task Components: chinese-translation, Documentation Reporter: Robert Metzger The change in https://github.com/apache/flink/commit/7c5ac3584e42a0e7ebc5e78c532887bf4d383d9d needs to be added to the Chinese variant of the documentation page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Release 1.10.1, release candidate #3
Thanks a lot for creating another RC! +1 (binding) - checked diff to last RC: https://github.com/apache/flink/compare/release-1.10.1-rc2...release-1.10.1-rc3 - kinesis dependency change is properly documented - started Flink locally (on Java11 :) ) - seems to be build off the specified commit - ran example - checked logs - staging repo looks ok On Thu, May 7, 2020 at 2:21 PM Yu Li wrote: > Hi everyone, > > Please review and vote on the release candidate #3 for version 1.10.1, as > follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release and binary convenience releases to be > deployed to dist.apache.org [2], which are signed with the key with > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-1.10.1-rc3" [5], > * website pull request listing the new release and adding announcement blog > post [6]. > > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > > Thanks, > Yu > > [1] > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891 > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc3/ > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > [4] > https://repository.apache.org/content/repositories/orgapacheflink-1364/ > [5] > > https://github.com/apache/flink/commit/c5915cf87f96e1c7ebd84ad00f7eabade7e7fe37 > [6] https://github.com/apache/flink-web/pull/330 >
[jira] [Created] (FLINK-17582) Update quickstarts to use universal Kafka connector
Seth Wiesman created FLINK-17582: Summary: Update quickstarts to use universal Kafka connector Key: FLINK-17582 URL: https://issues.apache.org/jira/browse/FLINK-17582 Project: Flink Issue Type: Improvement Components: Quickstarts Reporter: Seth Wiesman Assignee: Seth Wiesman -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17583) Allow option to store a savepoint's _metadata file separate from its data files
Steve Bairos created FLINK-17583: Summary: Allow option to store a savepoint's _metadata file separate from its data files Key: FLINK-17583 URL: https://issues.apache.org/jira/browse/FLINK-17583 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.9.1 Reporter: Steve Bairos (In the description I mainly talk about savepoints, but the plan ) We have a deployment framework that often needs to be able to return a list of valid savepoints in S3 with a certain prefix. Our assertion is that if an S3 object ends with '_metadata', then it is a valid savepoint. In order to generate the list of valid savepoints, we need to locate all of the _metadata files that start with a given prefix. For example, if our S3 bucket's paths look like this: {code:java} s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/_metadata s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c165546-c326-43c0-9f47-f9a2cfd000ed ... thousands of other savepoint data files s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c757e5b-92b7-47b8-bfe8-cfe70eb28702 s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/_metadata s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/41297fd5-40df-4683-bfb6-534bfddae92a ... thousands of other savepoint data files s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/acbe839a-1ec7-4b41-9d87-595d557c2ac6 s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/_metadata s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/2d2f5551-56a7-4fea-b25b-b0156660c650 thousands of other savepoint data files s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/c8c410df-5fb0-46a0-84c5-43e1575e8dc5 ... dozens of other savepoint dirs {code} In order to get a list of all savepoints that my-job1 could possibly start with, we would want to get all the savepoints that start with the prefix: {code:java} s3://bucket/savepoints/my-job1 {code} Ideally, we would want to have the ability to get a list like this from S3: {code:java} s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/_metadata s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/_metadata s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/_metadata{code} Unfortunately there is no easy way to get this value because S3's API only allows you to search based on prefix and not postfix. Listing all objects with the prefix 's3://bucket/savepoints/my-job1' and then filtering the list to only include the files that contain _metadata will also not work because there are thousands of savepoint data files that have the same prefix such as: {code:java} s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c165546-c326-43c0-9f47-f9a2cfd000ed s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c757e5b-92b7-47b8-bfe8-cfe70eb28702 s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/acbe839a-1ec7-4b41-9d87-595d557c2ac6 etc.{code} I propose that we add a configuration in a similar vein to the S3 entropy injector which allows us to store the _metadata file in a separate path from the savepoint's data files. For example, with this hypothetical configuration: {code:java} state.checkpoints.split.key: _datasplit_ state.checkpoints.split.metadata.dir: metadata state.checkpoints.split.data.dir: data{code} When a user triggers a savepoint with the path {code:java} s3://bucket/savepoints/_datasplit_/my-job1/2020-05-07/ {code} The resulting savepoint that is created looks like: {code:java} s3://bucket/savepoints/metadata/my-job1/2020-05-07/savepoint-654321-abcdef9876/_metadata s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/a50fc483-3581-4b55-a37e-b7c61b3ee47f s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/b0c6b7c0-6b94-43ae-8678-2f7640af1523 s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/c1855b35-c0b7-4347-9352-88423998e5ec{code} Notice that the metadata's prefix is {code:java} s3://bucket/savepoints/metadata/my-job1/2020-05-07/{code} and the data files' prefix is {code:java} s3://bucket/savepoints/data/my-job1/2020-05-07/{code} That way if I want to list all the savepoints for my-job1, I can just list all the objects with the prefix {code:java} aws s3 ls --recursive s3://bucket/savepoints/metadata/my-job1/{code} And I can get a clean list of just the _metadata files easily. One alternative that we've thought about is using is the entropy injection. It technically does separate the _metadata file from the rest of the data as well but it kind of makes a mess of entropy dirs in S3 so it's not our ideal choice. I'm happy to take a shot at i
Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL
I think we need the TEMPORAL TABLE syntax because they are conceptually more than just regular tables. In a addition to being a table that always holds the latest values (and can thereby serve as input to a continuous query), the system also needs to track the history of such a table to be able to serve different versions of the table (as requested by FOR SYSTEM_TIME AS OF). Of course we could define that Flink implicitly tracks the (recent, i.e., within watermark bounds) history of all dynamic tables. However, there's one more thing the system needs to know to be able to correctly evaluate FOR SYSTEM_TIME AS OF x, namely which time attribute to use as version of the temporal table. IMO it would be good to make this explicit, especially if there is a plan to eventually support support multiple event-time attributes / watermarks on a table. Just using the only event time attribute would be a bit too much convention magic for my taste (others might of course have a different opinion on this subject). So I agree with Kurt that we don't necessarily need the TEMPORAL TABLE statement if we agree on a few implicit conventions (implicit history table + implicit versioning attribute). I'm not a big fan of such conventions and think it's better to make such things explicit. For temporal joins with processing time semantics, we can use regular dynamic tables without declaring them as TEMPORAL since we don't need a history table to derive the current version. AFAIK, these are already the semantics we use for LookupTableSource. Regarding the question of append-only tables and temporal tables, I'd like to share some more thoughts. As I said above, a temporal table consists of a regular dynamic table A that holds the latest version and a table H that holds the history of A. 1) When defining a temporal table based on a regular dynamic table (with a primary key), we provide A and the Flink automatically maintains H (bounded by watermarks) 2) When defining a temporal table based on an append-only table, Flink ingests H and we use the temporal table function to turn it into a dynamic table with a primary key, i.e., into A. This conversion could also be done during ingestion by treating the append-only stream as an upsert changelog and converting it into a dynamic table with PK and as Table A (just in case 1). As Jark said "converting append-only table into changelog table" was moved to future work. Until then, we could only define TEMPORAL TABLE on a table that is derived from a proper changelog stream with a specific encoding. The TEMPORAL VIEW would be a shortcut which would allow us to perform the conversion in Flink SQL (and not within the connector) and defining the temporal properties on the result of the view. Cheers, Fabian Am Fr., 8. Mai 2020 um 08:29 Uhr schrieb Kurt Young : > I might missed something but why we need a new "TEMPORAL TABLE" syntax? > > According to Fabian's first mail: > > > Hence, the requirements for a temporal table are: > > * The temporal table has a primary key / unique attribute > > * The temporal table has a time-attribute that defines the start of the > > validity interval of a row (processing time or event time) > > * The system knows that the history of the table is tracked and can infer > > how to look up a version. > > I think primary key plus proper event time attribute is already > sufficient. So a join query looks like: > > "Fact join Dim FOR SYSTEM_TIME AS OF Fact.some_event_time ON Fact.id = > Dim.id" > > would means for every record belong to Fact, use Fact.some_event_time as > Dim's version (which > will only keep all records from Dim table with event time less or equal > to Fact.some_event_time, and > keep only one record for each primary key). > > The temporal behavior is actually triggered by the join syntax "FOR > SYSTEM_TIME AS OF Fact.some_event_time" > but not the DDL description. > > Best, > Kurt > > > On Fri, May 8, 2020 at 10:51 AM Jark Wu wrote: > >> Hi, >> >> I agree what Fabian said above. >> Besides, IMO, (3) is in a lower priority and will involve much more >> things. >> It makes sense to me to do it in two-phase. >> >> Regarding to (3), the key point to convert an append-only table into >> changelog table is that the framework should know the operation type, >> so we introduced a special CREATE VIEW syntax to do it in the >> documentation >> [1]. Here is an example: >> >> -- my_binlog table is registered as an append-only table >> CREATE TABLE my_binlog ( >> before ROW<...>, >> after ROW<...>, >> op STRING, >> op_ms TIMESTAMP(3) >> ) WITH ( >> 'connector.type' = 'kafka', >> ... >> ); >> >> -- interpret my_binlog as a changelog on the op_type and id key >> CREATE VIEW my_table AS >> SELECT >> after.* >> FROM my_binlog >> CHANGELOG OPERATION BY op >> UPDATE KEY BY (id); >> >> -- my_table will materialize the insert/delete/update changes >> -- if we have 4 records in dbz that >> -- a create for 1004 >> -- an update for 1004 >> -- a c
[jira] [Created] (FLINK-17584) disableAutoTypeRegistration option does not work with Streaming API, only with Batch
Yaron Shani created FLINK-17584: --- Summary: disableAutoTypeRegistration option does not work with Streaming API, only with Batch Key: FLINK-17584 URL: https://issues.apache.org/jira/browse/FLINK-17584 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: Yaron Shani Hey, There is a feature called disableAutoTypeRegistration which is, from my understanding, should disable the auto-loading classes into Kryo. It seems to work on the Batch API, but I don't see any reference into the DataStream code, and it does not work there. Is it by design? If so, I think its better if it would state it clearly. If not, can I suggest a fix? Something like this: {code:java} @Override @PublicEvolving public TypeSerializer createSerializer(ExecutionConfig config) { if (config.hasGenericTypesDisabled()) { throw new UnsupportedOperationException( "Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() + " is treated as a generic type."); } if(config.isAutoTypeRegistrationDisabled()) { if(!config.getRegisteredKryoTypes().contains(this.typeClass)) { throw new UnsupportedOperationException( "Auto type registration (disableAutoTypeRegistration) have been enabled in the ExecutionConfig and type " + this.typeClass.getName() + " is treated as a auto type."); } } return new KryoSerializer(this.typeClass, config); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL
All tables being described by Flink's DDL are dynamic tables. But dynamic table is more like a logical concept, but not physical things. Physically, dynamic table has two different forms, one is a materialized table which changes over time (e.g. Database table, HBase table), another form is stream which represents change logs, and they are typically stored in message queue (e.g, Kafka). For the later one, I think the records already representing the history of the dynamic table based on stream-table duality. So regarding to: > Of course we could define that Flink implicitly tracks the (recent, i.e., within watermark bounds) history of all dynamic tables. I don't think this is Flink implicitly tracking the history of the dynamic table, but the physical data of the table is already the history itself. What Flink did is read the history out, and organize them to be prepared for further operations. I agree with another implicit convention I took though, which treats the event time as the version of the dynamic table. Strictly speaking, we should use another syntax "PERIOD FOR SYSTEM_TIME" [1] to indicate the version of the table. I've been thinking about this for quite a bit, it turns out that this semantic is too similar with Flink's event time. It will cause more trouble for users to understand what does this mean if we treat event time and this "PERIOD FOR SYSTEM_TIME" differently. And I'm also afraid that we will introduce lots of bugs because not all the developers will understand this easily. [1] https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15 Best, Kurt On Sat, May 9, 2020 at 5:32 AM Fabian Hueske wrote: > I think we need the TEMPORAL TABLE syntax because they are conceptually > more than just regular tables. > In a addition to being a table that always holds the latest values (and > can thereby serve as input to a continuous query), the system also needs to > track the history of such a table to be able to serve different versions of > the table (as requested by FOR SYSTEM_TIME AS OF). > > Of course we could define that Flink implicitly tracks the (recent, i.e., > within watermark bounds) history of all dynamic tables. > However, there's one more thing the system needs to know to be able to > correctly evaluate FOR SYSTEM_TIME AS OF x, namely which time attribute to > use as version of the temporal table. > IMO it would be good to make this explicit, especially if there is a plan > to eventually support support multiple event-time attributes / watermarks > on a table. > Just using the only event time attribute would be a bit too much > convention magic for my taste (others might of course have a different > opinion on this subject). > > So I agree with Kurt that we don't necessarily need the TEMPORAL TABLE > statement if we agree on a few implicit conventions (implicit history table > + implicit versioning attribute). > I'm not a big fan of such conventions and think it's better to make such > things explicit. > > For temporal joins with processing time semantics, we can use regular > dynamic tables without declaring them as TEMPORAL since we don't need a > history table to derive the current version. > AFAIK, these are already the semantics we use for LookupTableSource. > > Regarding the question of append-only tables and temporal tables, I'd like > to share some more thoughts. > As I said above, a temporal table consists of a regular dynamic table A > that holds the latest version and a table H that holds the history of A. > 1) When defining a temporal table based on a regular dynamic table (with a > primary key), we provide A and the Flink automatically maintains H (bounded > by watermarks) > 2) When defining a temporal table based on an append-only table, Flink > ingests H and we use the temporal table function to turn it into a dynamic > table with a primary key, i.e., into A. This conversion could also be done > during ingestion by treating the append-only stream as an upsert changelog > and converting it into a dynamic table with PK and as Table A (just in case > 1). > > As Jark said "converting append-only table into changelog table" was moved > to future work. > Until then, we could only define TEMPORAL TABLE on a table that is derived > from a proper changelog stream with a specific encoding. > The TEMPORAL VIEW would be a shortcut which would allow us to perform the > conversion in Flink SQL (and not within the connector) and defining the > temporal properties on the result of the view. > > Cheers, > Fabian > > > > Am Fr., 8. Mai 2020 um 08:29 Uhr schrieb Kurt Young : > >> I might missed something but why we need a new "TEMPORAL TABLE" syntax? >> >> According to Fabian's first mail: >> >> > Hence, the requirements for a temporal table are: >> > * The temporal table has a primary key / unique attribute >> > * The temporal table has a time-attribute that defines the start of the >> > validity interval of a row (processing time or ev
Would you please give me the permission as a contributor
Hi Guys, I want to contribute to Apache Flink. Would you please give me the permission as a contributor? My JIRA ID is limaowei66.
Re: Would you please give me the permission as a contributor
Hi Maowei, Welcome to the community! Currently, there are no special permissions required to contribute to Flink. Just ping a committer if you want to work on some JIRA ticket and someone will assign the ticket to you. Here is some information about how to contribute to Flink [1]. [1] https://flink.apache.org/contributing/contribute-code.html Best, Yangze Guo On Sat, May 9, 2020 at 10:16 AM 天地任我游 <770236...@qq.com> wrote: > > Hi Guys, I want to contribute to Apache Flink. Would you please give me the > permission as a contributor? My JIRA ID is limaowei66.
[jira] [Created] (FLINK-17585) "PythonProgramOptions" changes the entry point class when user submit a Java sql job which contains Python UDF
Wei Zhong created FLINK-17585: - Summary: "PythonProgramOptions" changes the entry point class when user submit a Java sql job which contains Python UDF Key: FLINK-17585 URL: https://issues.apache.org/jira/browse/FLINK-17585 Project: Flink Issue Type: Bug Components: API / Python Reporter: Wei Zhong If we running such a command: {code:java} flink run -pyfs xxx.py xxx.jar {code} The main class will be changed to "PythonDriver". It is because the "PythonProgramOptions" class changes the entry point class when the python command line options are detected. We should consider the situation that user submit a Java sql job which contains Python UDF. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17586) Due to the new behavior of the table planner the Python dependency management command line options does not work
Wei Zhong created FLINK-17586: - Summary: Due to the new behavior of the table planner the Python dependency management command line options does not work Key: FLINK-17586 URL: https://issues.apache.org/jira/browse/FLINK-17586 Project: Flink Issue Type: Bug Components: API / Python Reporter: Wei Zhong -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17587) Filesystem streaming sink support partition commit (success file)
Jingsong Lee created FLINK-17587: Summary: Filesystem streaming sink support partition commit (success file) Key: FLINK-17587 URL: https://issues.apache.org/jira/browse/FLINK-17587 Project: Flink Issue Type: Sub-task Reporter: Jingsong Lee -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17588) Throw exception when hadoop jar is missing in flink scala's yarn mode.
Jeff Zhang created FLINK-17588: -- Summary: Throw exception when hadoop jar is missing in flink scala's yarn mode. Key: FLINK-17588 URL: https://issues.apache.org/jira/browse/FLINK-17588 Project: Flink Issue Type: Improvement Components: Scala Shell Affects Versions: 1.10.0 Reporter: Jeff Zhang -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Release 1.10.1, release candidate #3
Thanks for the RC! +1 (binding) - repeated benchmark runs On Fri, May 8, 2020 at 10:52 AM Robert Metzger wrote: > Thanks a lot for creating another RC! > > +1 (binding) > > - checked diff to last RC: > > https://github.com/apache/flink/compare/release-1.10.1-rc2...release-1.10.1-rc3 > - kinesis dependency change is properly documented > - started Flink locally (on Java11 :) ) >- seems to be build off the specified commit >- ran example >- checked logs > - staging repo looks ok > > > > On Thu, May 7, 2020 at 2:21 PM Yu Li wrote: > > > Hi everyone, > > > > Please review and vote on the release candidate #3 for version 1.10.1, as > > follows: > > [ ] +1, Approve the release > > [ ] -1, Do not approve the release (please provide specific comments) > > > > > > The complete staging area is available for your review, which includes: > > * JIRA release notes [1], > > * the official Apache source release and binary convenience releases to > be > > deployed to dist.apache.org [2], which are signed with the key with > > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3], > > * all artifacts to be deployed to the Maven Central Repository [4], > > * source code tag "release-1.10.1-rc3" [5], > > * website pull request listing the new release and adding announcement > blog > > post [6]. > > > > The vote will be open for at least 72 hours. It is adopted by majority > > approval, with at least 3 PMC affirmative votes. > > > > Thanks, > > Yu > > > > [1] > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891 > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc3/ > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > [4] > > https://repository.apache.org/content/repositories/orgapacheflink-1364/ > > [5] > > > > > https://github.com/apache/flink/commit/c5915cf87f96e1c7ebd84ad00f7eabade7e7fe37 > > [6] https://github.com/apache/flink-web/pull/330 > > >
[jira] [Created] (FLINK-17589) Extend StreamingFileSink to Support Streaming Hive Connector
Yun Gao created FLINK-17589: --- Summary: Extend StreamingFileSink to Support Streaming Hive Connector Key: FLINK-17589 URL: https://issues.apache.org/jira/browse/FLINK-17589 Project: Flink Issue Type: New Feature Components: Connectors / FileSystem Reporter: Yun Gao Fix For: 1.11.0 According to the [Discussion on FLIP-115|[https://lists.apache.org/thread.html/rb1795428e481dbeaa1af2dcffed87b4804ed81d2af60256cd50032d7%40%3Cdev.flink.apache.org%3E]], we will use StreamingFileSink to support the streaming hive connector. This requires to make some extension to the current StreamingFileSink: # Support path-based writer to write the specified Hadoop path. # Provides listeners to collect the bucket state so that the Hive sink could decide when one bucket is terminated and then writing meta-info to Hive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17590) Add Bucket lifecycle listener to support acquiring bucket state
Yun Gao created FLINK-17590: --- Summary: Add Bucket lifecycle listener to support acquiring bucket state Key: FLINK-17590 URL: https://issues.apache.org/jira/browse/FLINK-17590 Project: Flink Issue Type: New Feature Components: Connectors / FileSystem Reporter: Yun Gao Fix For: 1.11.0 Hive sink will reuse the Buckets class of StreamingFileSink, which encapsulate most of the logic of StreamingFileSink. Hive sink requires to writing one-piece of meta-info into Hive meta store after a partition (namely Bucket in StreamingFileSink) has been terminated. Currently the termination is judged by event-time/processing time ([FLIP-115|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table]]). To support the requirement of the Hive Sink, we would add listener for acquiring the event bucket creation and getting inactive. A bucket get inactive if all the previous records have been committed. Then Hive Sink could safely writing meta-info if the time has exceeded the bucket's boundary and it has been inactive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17591) TableEnvironmentITCase.testExecuteSqlAndToDataStream failed
Jark Wu created FLINK-17591: --- Summary: TableEnvironmentITCase.testExecuteSqlAndToDataStream failed Key: FLINK-17591 URL: https://issues.apache.org/jira/browse/FLINK-17591 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner Reporter: Jark Wu Here is the instance: https://dev.azure.com/imjark/Flink/_build/results?buildId=61&view=logs&j=69332ead-8935-5abf-5b3d-e4280fb1ff4c&t=6855dd6e-a7b0-5fd1-158e-29fc186b16c8 {code:java} at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) [INFO] [INFO] Results: [INFO] [ERROR] Failures: [ERROR] TableEnvironmentITCase.testExecuteSqlAndToDataStream:343 [INFO] [ERROR] Tests run: 791, Failures: 1, Errors: 0, Skipped: 13 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17592) flink-table-planner doesn't compile on Scala 2.12
Robert Metzger created FLINK-17592: -- Summary: flink-table-planner doesn't compile on Scala 2.12 Key: FLINK-17592 URL: https://issues.apache.org/jira/browse/FLINK-17592 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.11.0 Reporter: Robert Metzger CI: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=844&view=logs&j=ed6509f5-1153-558c-557a-5ee0afbcdf24&t=241b1e5e-1a8e-5e6a-469a-a9b8cad87065 {code} [WARNING] ^ [ERROR] /__w/1/s/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSinks.scala:73: error: overriding method getTableSchema in trait TableSink of type ()org.apache.flink.table.api.TableSchema; [ERROR] method getTableSchema needs `override' modifier [ERROR] def getTableSchema: TableSchema = { [ERROR] ^ [WARNING] 8 warnings found [ERROR] one error found {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)