Fwd: Flink CLI properties with HA
-- Forwarded message -- From: Sampath Bhat Date: Fri, Jul 13, 2018 at 3:18 PM Subject: Flink CLI properties with HA To: user Hello When HA is enabled in the flink cluster and if I've to submit job via flink CLI then in the flink-conf.yaml of flink CLI should contain this properties - high-availability: zookeeper high-availability.cluster-id: flink high-availability.zookeeper.path.root: flink high-availability.storageDir: high-availability.zookeeper.quorum: What is the need of high-availability.storageDir for flink CLI. Does this mean that even flink client should be able to access the mentioned path or is it some check being done on the property name? Without these properties flink cli will not be able to submit job to flink cluster when HA is enabled.
[jira] [Created] (FLINK-9855) KeyedStream.IntervalJoined#process does not work for lambdas
Timo Walther created FLINK-9855: --- Summary: KeyedStream.IntervalJoined#process does not work for lambdas Key: FLINK-9855 URL: https://issues.apache.org/jira/browse/FLINK-9855 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Timo Walther KeyedStream.IntervalJoined is not calling type extraction functions correctly. It should have an index. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Flink Query Optimizer
I’m not sure. As far as I know APIs are already in place for statistics support and only missing part is actual statistic provider. With this respect it batch and streaming might be almost completely independent of one another, so it shouldn’t be matter of “first that then that” but whatever will be higher on someone’s priority list and/or what will be easier. Probably low effort, crude but generic solution would be to allow user statically configure table sizes via environment file/table api/SET SESSION VALUE in SQL client. Better solutions would require custom logic per each connector. Piotrek > On 15 Jul 2018, at 17:28, Rong Rong wrote: > > +1. Having table statistics is one of the main blockers for more advanced > optimization rules. I would love to contribute to this effort! > > However I think @Alberts case is more on the data set side. Was there any > plan to integrate with data set table statistics first then extend to data > stream domain? > > -- > Rong > > On Sun, Jul 15, 2018 at 7:21 AM Piotr Nowojski > wrote: > >> Hi, >> >> Currently the biggest limitation that prevents better query optimisation >> is lack of table statistics (which are not trivial to provide in >> streaming), thus Joins/Aggregation reordering doesn’t work. We have some >> ideas how to tackle this issue and definitely at some point of time we will >> improve this. >> >> Piotrek >> >>> On 14 Jul 2018, at 06:48, Xingcan Cui wrote: >>> >>> Hi Albert, >>> >>> Calcite provides a rule-based optimizer (as a framework), which means >> users can customize it by adding rules. That’s exactly what Flink did. From >> the logical plan to the physical plan, the translations are triggered by >> different sets of rules, according to which the relational expressions are >> replaced, reordered or optimized. >>> >>> However, IMO, the current optimization rules in Flink Table API are >> quite primal. Some SQL statements (e.g., multiple joins) are just >> translated to feasible execution plans, instead of optimized ones, since >> it’s much more difficult to conduct query optimization on large datasets or >> dynamic streams. You could first start from the Calcite query optimizer, >> and then try to make your own rules. >>> >>> Best, >>> Xingcan >>> On Jul 14, 2018, at 11:55 AM, vino yang wrote: Hi Albert, First I guess the query optimizer you mentioned is about Flink table & >> sql (for batch API there is another optimizer which is implemented by >> Flink). Yes, now for table & sql, Flink use Apache Calcite's query optimizer to translate into a Calcite plan which is then optimized according to Calcite's optimization rules. The following rules are applied so far: >> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala In view of Flink depends on the Calcite to do the optimization, I think enhance Flink and Calcite would be the right direction. Hope for you provide more idea and details. Flink community welcome your idea and contribution. Thanks. Vino. 2018-07-13 23:39 GMT+08:00 Albert Jonathan : > Hello, > > I am just wondering, does Flink use Apache Calcite's query optimizer to > generate an optimal logical plan, or does it have its own query >> optimizer? > From what I observed so far, the Flink's query optimizer only groups > operator together without changing the order of aggregation operators > (e.g., join). Did I miss anything? > > I am thinking of extending Flink to apply query optimization as in the > RDBMS by either integrating it with Calcite or implementing it as a new > module. > Any feedback or guidelines will be highly appreciated. > > Thank you, > Albert > >>> >> >>
[jira] [Created] (FLINK-9856) ExecutionGraphCoLocationRestartTest#testConstraintsAfterRestart failed on travis
Chesnay Schepler created FLINK-9856: --- Summary: ExecutionGraphCoLocationRestartTest#testConstraintsAfterRestart failed on travis Key: FLINK-9856 URL: https://issues.apache.org/jira/browse/FLINK-9856 Project: Flink Issue Type: Bug Components: Distributed Coordination, Tests Affects Versions: 1.6.0 Reporter: Chesnay Schepler https://travis-ci.org/apache/flink/jobs/404321779 {code} Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.288 sec <<< FAILURE! - in org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest testConstraintsAfterRestart[Scheduler type = SLOT_POOL](org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest) Time elapsed: 6.407 sec <<< ERROR! java.util.concurrent.TimeoutException: Not all executions fulfilled the predicate in time. at org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate(ExecutionGraphTestUtils.java:189) at org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest.testConstraintsAfterRestart(ExecutionGraphCoLocationRestartTest.java:108) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Flink CLI properties with HA
Hi Sampath, Flink CLI need to retrieve the JobManager leader address, so it need to access the HA specific configuration. Because if based on Zookeeper to implement the HA, the leader address information will fetch from Zookeeper. The main use of config item *high-availability.storageDir* is storage (Job graph, checkpoint and so on). Actually, the real data is stored under this path which used to recover purpose, zookeeper just store a state handle. --- Thanks. vino. 2018-07-16 15:28 GMT+08:00 Sampath Bhat : > > -- Forwarded message -- > From: Sampath Bhat > Date: Fri, Jul 13, 2018 at 3:18 PM > Subject: Flink CLI properties with HA > To: user > > > Hello > > When HA is enabled in the flink cluster and if I've to submit job via > flink CLI then in the flink-conf.yaml of flink CLI should contain this > properties - > high-availability: zookeeper > high-availability.cluster-id: flink > high-availability.zookeeper.path.root: flink > high-availability.storageDir: > high-availability.zookeeper.quorum: > > What is the need of high-availability.storageDir for flink CLI. Does this > mean that even flink client should be able to access the mentioned path or > is it some check being done on the property name? > > Without these properties flink cli will not be able to submit job to flink > cluster when HA is enabled. > >
[jira] [Created] (FLINK-9858) State TTL End-to-End Test
Andrey Zagrebin created FLINK-9858: -- Summary: State TTL End-to-End Test Key: FLINK-9858 URL: https://issues.apache.org/jira/browse/FLINK-9858 Project: Flink Issue Type: Sub-task Reporter: Andrey Zagrebin Assignee: Andrey Zagrebin -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9857) Processing-time timers fire too early
Aljoscha Krettek created FLINK-9857: --- Summary: Processing-time timers fire too early Key: FLINK-9857 URL: https://issues.apache.org/jira/browse/FLINK-9857 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.5.1, 1.4.2, 1.3.4, 1.6.0 Reporter: Aljoscha Krettek Fix For: 1.5.2, 1.6.0 The firing of processing-time timers is off by one. This leads to problems in edge cases, as discovered [here (mailing list)|https://lists.apache.org/thread.html/e49748fa5fa1c9217b9dfb65eea7a37af1f2895c769528e77a1a93fa@%3Cuser.flink.apache.org%3E] when elements arrive at the timestamp that is the end of the window. The problem is [here (github)|https://github.com/apache/flink/blob/79b38f8f9a79b917d525842cf46087c5b8c40f3d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java#L231]. For event-time, we fire timers when the watermark is >= the timestamp, this is correct because a watermark T says that we will not see elements with a timestamp smaller or equal to T. For processing time, a time of T does not say that we won't see an element with timestamp T, which makes processing-time timers fire one ms too early. I think we can fix it by turning that {{<=}} into a {{<}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9859) Distinguish TM akka config with JM config
陈梓立 created FLINK-9859: -- Summary: Distinguish TM akka config with JM config Key: FLINK-9859 URL: https://issues.apache.org/jira/browse/FLINK-9859 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.5.1 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.5.2 ... increase the number of akka threads on JM, to improve its performance; decrease the number of akka threads on TM, to save resource. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9860) Netty resource leak on receiver side
Till Rohrmann created FLINK-9860: Summary: Netty resource leak on receiver side Key: FLINK-9860 URL: https://issues.apache.org/jira/browse/FLINK-9860 Project: Flink Issue Type: Bug Components: Network Affects Versions: 1.6.0 Reporter: Till Rohrmann Assignee: Nico Kruber Fix For: 1.6.0 The Hadoop-free Wordcount end-to-end test fails with the following exception: {code} ERROR org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331) org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137) org.apache.flink.shaded.netty4.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147) org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) {code} We might have a resource leak on the receiving side of our network stack. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Flink WindowedStream - Need assistance
Dear All, We have 2 independent streams which will receive elements in different frequency, DataStream> splittedActivationTuple; DataStream> unionReloadsStream; We have a requirement to keep "splittedActivationTuple" stream elements in a Window of eviction time period of 24 hours. So I created a "WindowedStream" like below, WindowedStream, Tuple, GlobalWindow> keyedWindowedActStream = splittedActivationTuple .assignTimestampsAndWatermarks(new IngestionTimeExtractor()).keyBy(0).window(GlobalWindows.create()) .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))); Our requirements are following, 1. When "unionReloadsStream" receives data, we need to check whether the corresponding "String" field matches with the "String" field in the WindowedStream and accumulate "WindowedStream's" Double with "unionReloadsStream" Double.Will this possible with Flink? I checked CoGroup and CoMap. But I couldn't figure out how to do since I am new. 2. CEP functionality to create a new Stream of from WindowedStream if the Double value > 100? I went through several flink's CEP tutorials. But couldn't able to figure out how to do with "WindowedStream"? I am very new to flink. Any assistance would be highly appreciated. Thanks, Titus
[jira] [Created] (FLINK-9861) Add end-to-end test for reworked BucketingSink
Till Rohrmann created FLINK-9861: Summary: Add end-to-end test for reworked BucketingSink Key: FLINK-9861 URL: https://issues.apache.org/jira/browse/FLINK-9861 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Affects Versions: 1.6.0 Reporter: Till Rohrmann Fix For: 1.6.0 We should add a end-to-end test for the reworked BucketingSink to verify that the sink works with different {{FileSystems}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9862) Update end-to-end test to use RocksDB backed timers
Till Rohrmann created FLINK-9862: Summary: Update end-to-end test to use RocksDB backed timers Key: FLINK-9862 URL: https://issues.apache.org/jira/browse/FLINK-9862 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing, Streaming Affects Versions: 1.6.0 Reporter: Till Rohrmann Fix For: 1.6.0 We should add or modify an end-to-end test to use RocksDB backed timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9863) Add a built-in ingestion time TS extractor
Timo Walther created FLINK-9863: --- Summary: Add a built-in ingestion time TS extractor Key: FLINK-9863 URL: https://issues.apache.org/jira/browse/FLINK-9863 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther There are cases where ingestion time is also useful in the Table & SQL API. As an example see FLINK-9857 and the linked mailing list discussion there. We should provide an ingestion time timestamps extractor in {{org.apache.flink.table.sources.tsextractors}}. The following classes should be updated as welll: - org.apache.flink.table.descriptors.Rowtime - org.apache.flink.table.descriptors.RowtimeValidator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9864) Make timestamp extraction more flexible in SQL Client
Timo Walther created FLINK-9864: --- Summary: Make timestamp extraction more flexible in SQL Client Key: FLINK-9864 URL: https://issues.apache.org/jira/browse/FLINK-9864 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Currently, a timestamp must be in the top-level of a possibly nested row and must have a certain format. We should think about making this more flexible to cover most of the use cases. A first solution could be to allow a DOT operator syntax: {{myfield.nested.timestamp}} Other cases might be: - The time could also be split into several field - Or needs to be parsed using a [date format syntax|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#date-format-specifier]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9865) flink-hadoop-compatibility should assume Hadoop as provided
Stephan Ewen created FLINK-9865: --- Summary: flink-hadoop-compatibility should assume Hadoop as provided Key: FLINK-9865 URL: https://issues.apache.org/jira/browse/FLINK-9865 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.5.1, 1.5.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.6.0 The {{flink-hadoop-compatibility}} project as a *compile* scope dependency on Hadoop ({{flink-hadoop-shaded}}). Because of that, the hadoop dependencies are pulled into the user application. Like in other Hadoop-dependent modules, we should assume that Hadoop is provided in the framework classpath already. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster
Dawid Wysakowicz created FLINK-9866: --- Summary: Allow passing program arguments to StandaloneJobCluster Key: FLINK-9866 URL: https://issues.apache.org/jira/browse/FLINK-9866 Project: Flink Issue Type: Bug Affects Versions: 1.6.0 Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Right now always an empty array is passed as arguments to {{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we should extend run and docker scripts to allow passing arguments. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9867) Extend release notes for Flink 1.6
Till Rohrmann created FLINK-9867: Summary: Extend release notes for Flink 1.6 Key: FLINK-9867 URL: https://issues.apache.org/jira/browse/FLINK-9867 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.6.0 Reporter: Till Rohrmann Fix For: 1.6.0 We should extend the release notes under {{/docs/release-notes/flink-1.6.md}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9868) Expose channel id to ProcessFunction
Hequn Cheng created FLINK-9868: -- Summary: Expose channel id to ProcessFunction Key: FLINK-9868 URL: https://issues.apache.org/jira/browse/FLINK-9868 Project: Flink Issue Type: New Feature Components: Local Runtime Reporter: Hequn Cheng Currently, channel id has not been exposed from {{StreamInputProcessor}} to the {{ProcessOperator}} and {{ProcessFunction}}. There are some cases that users want the channel id(), as discovered [here(mailing list)|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallelism-and-keyed-streams-td21501.html]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance
陈梓立 created FLINK-9869: -- Summary: Send PartitionInfo in batch to Improve perfornance Key: FLINK-9869 URL: https://issues.apache.org/jira/browse/FLINK-9869 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.5.1 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.5.2 ... current we send partition info as soon as one arrive. we could `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)