Fwd: Flink CLI properties with HA

2018-07-16 Thread Sampath Bhat
-- Forwarded message --
From: Sampath Bhat 
Date: Fri, Jul 13, 2018 at 3:18 PM
Subject: Flink CLI properties with HA
To: user 


Hello

When HA is enabled in the flink cluster and if I've to submit job via flink
CLI then in the flink-conf.yaml of flink CLI should contain this properties
-
high-availability: zookeeper
high-availability.cluster-id: flink
high-availability.zookeeper.path.root: flink
high-availability.storageDir: 
high-availability.zookeeper.quorum: 

What is the need of high-availability.storageDir for flink CLI. Does this
mean that even flink client should be able to access the mentioned path or
is it some check being done on the property name?

Without these properties flink cli will not be able to submit job to flink
cluster when HA is enabled.


[jira] [Created] (FLINK-9855) KeyedStream.IntervalJoined#process does not work for lambdas

2018-07-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9855:
---

 Summary: KeyedStream.IntervalJoined#process does not work for 
lambdas
 Key: FLINK-9855
 URL: https://issues.apache.org/jira/browse/FLINK-9855
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Reporter: Timo Walther


KeyedStream.IntervalJoined is not calling type extraction functions correctly. 
It should have an index.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Flink Query Optimizer

2018-07-16 Thread Piotr Nowojski
I’m not sure. As far as I know APIs are already in place for statistics support 
and only missing part is actual statistic provider. With this respect it batch 
and streaming might be almost completely independent of one another, so it 
shouldn’t be matter of “first that then that” but whatever will be higher on 
someone’s priority list and/or what will be easier.

Probably low effort, crude but generic solution would be to allow user 
statically configure table sizes via environment file/table api/SET SESSION 
VALUE in SQL client. Better solutions would require custom logic per each 
connector.

Piotrek  

> On 15 Jul 2018, at 17:28, Rong Rong  wrote:
> 
> +1. Having table statistics is one of the main blockers for more advanced
> optimization rules. I would love to contribute to this effort!
> 
> However I think @Alberts case is more on the data set side. Was there any
> plan to integrate with data set table statistics first then extend to data
> stream domain?
> 
> --
> Rong
> 
> On Sun, Jul 15, 2018 at 7:21 AM Piotr Nowojski 
> wrote:
> 
>> Hi,
>> 
>> Currently the biggest limitation that prevents better query optimisation
>> is lack of table statistics (which are not trivial to provide in
>> streaming), thus Joins/Aggregation reordering doesn’t work. We have some
>> ideas how to tackle this issue and definitely at some point of time we will
>> improve this.
>> 
>> Piotrek
>> 
>>> On 14 Jul 2018, at 06:48, Xingcan Cui  wrote:
>>> 
>>> Hi Albert,
>>> 
>>> Calcite provides a rule-based optimizer (as a framework), which means
>> users can customize it by adding rules. That’s exactly what Flink did. From
>> the logical plan to the physical plan, the translations are triggered by
>> different sets of rules, according to which the relational expressions are
>> replaced, reordered or optimized.
>>> 
>>> However, IMO, the current optimization rules in Flink Table API are
>> quite primal. Some SQL statements (e.g., multiple joins) are just
>> translated to feasible execution plans, instead of optimized ones, since
>> it’s much more difficult to conduct query optimization on large datasets or
>> dynamic streams. You could first start from the Calcite query optimizer,
>> and then try to make your own rules.
>>> 
>>> Best,
>>> Xingcan
>>> 
 On Jul 14, 2018, at 11:55 AM, vino yang  wrote:
 
 Hi Albert,
 
 First I guess the query optimizer you mentioned is about Flink table &
>> sql
 (for batch API there is another optimizer which is implemented by
>> Flink).
 
 Yes, now for table & sql, Flink use Apache Calcite's query optimizer to
 translate into a Calcite plan
 which is then optimized according to Calcite's optimization rules.
 
 The following rules are applied so far:
 
>> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
 In view of Flink depends on the Calcite to do the optimization, I think
 enhance Flink and Calcite would be the right direction.
 
 Hope for you provide more idea and details. Flink community welcome your
 idea and contribution.
 
 Thanks.
 Vino.
 
 
 2018-07-13 23:39 GMT+08:00 Albert Jonathan :
 
> Hello,
> 
> I am just wondering, does Flink use Apache Calcite's query optimizer to
> generate an optimal logical plan, or does it have its own query
>> optimizer?
> From what I observed so far, the Flink's query optimizer only groups
> operator together without changing the order of aggregation operators
> (e.g., join). Did I miss anything?
> 
> I am thinking of extending Flink to apply query optimization as in the
> RDBMS by either integrating it with Calcite or implementing it as a new
> module.
> Any feedback or guidelines will be highly appreciated.
> 
> Thank you,
> Albert
> 
>>> 
>> 
>> 



[jira] [Created] (FLINK-9856) ExecutionGraphCoLocationRestartTest#testConstraintsAfterRestart failed on travis

2018-07-16 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9856:
---

 Summary: 
ExecutionGraphCoLocationRestartTest#testConstraintsAfterRestart failed on travis
 Key: FLINK-9856
 URL: https://issues.apache.org/jira/browse/FLINK-9856
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, Tests
Affects Versions: 1.6.0
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/jobs/404321779

{code}
Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.288 sec <<< 
FAILURE! - in 
org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest
testConstraintsAfterRestart[Scheduler type = 
SLOT_POOL](org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest)
  Time elapsed: 6.407 sec  <<< ERROR!
java.util.concurrent.TimeoutException: Not all executions fulfilled the 
predicate in time.
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate(ExecutionGraphTestUtils.java:189)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest.testConstraintsAfterRestart(ExecutionGraphCoLocationRestartTest.java:108)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Flink CLI properties with HA

2018-07-16 Thread vino yang
Hi Sampath,

Flink CLI need to retrieve the JobManager leader address, so it need  to
access the HA specific configuration. Because if based on Zookeeper to
implement the HA, the leader address information will fetch from Zookeeper.

The main use of config item *high-availability.storageDir* is storage (Job
graph, checkpoint and so on). Actually, the real data is stored under this
path which used to recover purpose, zookeeper just store a state handle.

---
Thanks.
vino.


2018-07-16 15:28 GMT+08:00 Sampath Bhat :

>
> -- Forwarded message --
> From: Sampath Bhat 
> Date: Fri, Jul 13, 2018 at 3:18 PM
> Subject: Flink CLI properties with HA
> To: user 
>
>
> Hello
>
> When HA is enabled in the flink cluster and if I've to submit job via
> flink CLI then in the flink-conf.yaml of flink CLI should contain this
> properties -
> high-availability: zookeeper
> high-availability.cluster-id: flink
> high-availability.zookeeper.path.root: flink
> high-availability.storageDir: 
> high-availability.zookeeper.quorum: 
>
> What is the need of high-availability.storageDir for flink CLI. Does this
> mean that even flink client should be able to access the mentioned path or
> is it some check being done on the property name?
>
> Without these properties flink cli will not be able to submit job to flink
> cluster when HA is enabled.
>
>


[jira] [Created] (FLINK-9858) State TTL End-to-End Test

2018-07-16 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-9858:
--

 Summary: State TTL End-to-End Test
 Key: FLINK-9858
 URL: https://issues.apache.org/jira/browse/FLINK-9858
 Project: Flink
  Issue Type: Sub-task
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9857) Processing-time timers fire too early

2018-07-16 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-9857:
---

 Summary: Processing-time timers fire too early
 Key: FLINK-9857
 URL: https://issues.apache.org/jira/browse/FLINK-9857
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.5.1, 1.4.2, 1.3.4, 1.6.0
Reporter: Aljoscha Krettek
 Fix For: 1.5.2, 1.6.0


The firing of processing-time timers is off by one. This leads to problems in 
edge cases, as discovered [here (mailing 
list)|https://lists.apache.org/thread.html/e49748fa5fa1c9217b9dfb65eea7a37af1f2895c769528e77a1a93fa@%3Cuser.flink.apache.org%3E]
 when elements arrive at the timestamp that is the end of the window.

The problem is [here 
(github)|https://github.com/apache/flink/blob/79b38f8f9a79b917d525842cf46087c5b8c40f3d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java#L231].
 For event-time, we fire timers when the watermark is >= the timestamp, this is 
correct because a watermark T says that we will not see elements with a 
timestamp smaller or equal to T. For processing time, a time of T does not say 
that we won't see an element with timestamp T, which makes processing-time 
timers fire one ms too early.

I think we can fix it by turning that {{<=}} into a {{<}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9859) Distinguish TM akka config with JM config

2018-07-16 Thread JIRA
陈梓立 created FLINK-9859:
--

 Summary: Distinguish TM akka config with JM config
 Key: FLINK-9859
 URL: https://issues.apache.org/jira/browse/FLINK-9859
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.5.2


... increase the number of akka threads on JM, to improve its performance; 
decrease the number of akka threads on TM, to save resource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9860:


 Summary: Netty resource leak on receiver side
 Key: FLINK-9860
 URL: https://issues.apache.org/jira/browse/FLINK-9860
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.6.0
Reporter: Till Rohrmann
Assignee: Nico Kruber
 Fix For: 1.6.0


The Hadoop-free Wordcount end-to-end test fails with the following exception:
{code}
ERROR org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - 
LEAK: ByteBuf.release() was not called before it's garbage-collected. See 
http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:

org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)

org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)

org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)

org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)

org.apache.flink.shaded.netty4.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)

org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)

org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)

org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
{code}

We might have a resource leak on the receiving side of our network stack.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Flink WindowedStream - Need assistance

2018-07-16 Thread Titus Rakkesh
Dear All,

We have 2 independent streams which will receive elements in different
frequency,

DataStream> splittedActivationTuple;

DataStream> unionReloadsStream;

We have a requirement to keep "splittedActivationTuple" stream elements in
a Window of eviction time period of 24 hours. So I created a
"WindowedStream" like below,

WindowedStream, Tuple, GlobalWindow>
keyedWindowedActStream = splittedActivationTuple
.assignTimestampsAndWatermarks(new
IngestionTimeExtractor()).keyBy(0).window(GlobalWindows.create())
.evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)));

Our requirements are following,

   1.

   When "unionReloadsStream" receives data, we need to check whether the
   corresponding "String" field matches with the "String" field in the
   WindowedStream and accumulate "WindowedStream's" Double with
   "unionReloadsStream" Double.Will this possible with Flink? I checked
   CoGroup and CoMap. But I couldn't figure out how to do since I am new.
   2.

   CEP functionality to create a new Stream of from WindowedStream if the
   Double value > 100? I went through several flink's CEP tutorials. But
   couldn't able to figure out how to do with "WindowedStream"?

I am very new to flink. Any assistance would be highly appreciated.

Thanks,

Titus


[jira] [Created] (FLINK-9861) Add end-to-end test for reworked BucketingSink

2018-07-16 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9861:


 Summary: Add end-to-end test for reworked BucketingSink
 Key: FLINK-9861
 URL: https://issues.apache.org/jira/browse/FLINK-9861
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Affects Versions: 1.6.0
Reporter: Till Rohrmann
 Fix For: 1.6.0


We should add a end-to-end test for the reworked BucketingSink to verify that 
the sink works with different {{FileSystems}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9862) Update end-to-end test to use RocksDB backed timers

2018-07-16 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9862:


 Summary: Update end-to-end test to use RocksDB backed timers
 Key: FLINK-9862
 URL: https://issues.apache.org/jira/browse/FLINK-9862
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing, Streaming
Affects Versions: 1.6.0
Reporter: Till Rohrmann
 Fix For: 1.6.0


We should add or modify an end-to-end test to use RocksDB backed timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9863) Add a built-in ingestion time TS extractor

2018-07-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9863:
---

 Summary: Add a built-in ingestion time TS extractor
 Key: FLINK-9863
 URL: https://issues.apache.org/jira/browse/FLINK-9863
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther


There are cases where ingestion time is also useful in the Table & SQL API. As 
an example see FLINK-9857 and the linked mailing list discussion there. We 
should provide an ingestion time timestamps extractor in 
{{org.apache.flink.table.sources.tsextractors}}.

The following classes should be updated as welll:
- org.apache.flink.table.descriptors.Rowtime
- org.apache.flink.table.descriptors.RowtimeValidator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9864) Make timestamp extraction more flexible in SQL Client

2018-07-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9864:
---

 Summary: Make timestamp extraction more flexible in SQL Client
 Key: FLINK-9864
 URL: https://issues.apache.org/jira/browse/FLINK-9864
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


Currently, a timestamp must be in the top-level of a possibly nested row and 
must have a certain format. We should think about making this more flexible to 
cover most of the use cases.

A first solution could be to allow a DOT operator syntax: 
{{myfield.nested.timestamp}}
Other cases might be:
- The time could also be split into several field
- Or needs to be parsed using a [date format 
syntax|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#date-format-specifier].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9865) flink-hadoop-compatibility should assume Hadoop as provided

2018-07-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9865:
---

 Summary: flink-hadoop-compatibility should assume Hadoop as 
provided
 Key: FLINK-9865
 URL: https://issues.apache.org/jira/browse/FLINK-9865
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.5.1, 1.5.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.6.0


The {{flink-hadoop-compatibility}} project as a *compile* scope dependency on 
Hadoop ({{flink-hadoop-shaded}}). Because of that, the hadoop dependencies are 
pulled into the user application.

Like in other Hadoop-dependent modules, we should assume that Hadoop is 
provided in the framework classpath already.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster

2018-07-16 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-9866:
---

 Summary: Allow passing program arguments to StandaloneJobCluster
 Key: FLINK-9866
 URL: https://issues.apache.org/jira/browse/FLINK-9866
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.6.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz


Right now always an empty array is passed as arguments to 
{{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we 
should extend run and docker scripts to allow passing arguments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9867) Extend release notes for Flink 1.6

2018-07-16 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9867:


 Summary: Extend release notes for Flink 1.6
 Key: FLINK-9867
 URL: https://issues.apache.org/jira/browse/FLINK-9867
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.6.0
Reporter: Till Rohrmann
 Fix For: 1.6.0


We should extend the release notes under {{/docs/release-notes/flink-1.6.md}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9868) Expose channel id to ProcessFunction

2018-07-16 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-9868:
--

 Summary: Expose channel id to ProcessFunction
 Key: FLINK-9868
 URL: https://issues.apache.org/jira/browse/FLINK-9868
 Project: Flink
  Issue Type: New Feature
  Components: Local Runtime
Reporter: Hequn Cheng


Currently, channel id has not been exposed from {{StreamInputProcessor}} to the 
{{ProcessOperator}} and {{ProcessFunction}}. There are some cases that users 
want the channel id(), as discovered [here(mailing 
list)|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallelism-and-keyed-streams-td21501.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance

2018-07-16 Thread JIRA
陈梓立 created FLINK-9869:
--

 Summary: Send PartitionInfo in batch to Improve perfornance
 Key: FLINK-9869
 URL: https://issues.apache.org/jira/browse/FLINK-9869
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.5.2


... current we send partition info as soon as one arrive. we could 
`cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)