[GitHub] flink pull request #3502: [FLINK-4565] Support for SQL IN operator

2017-03-10 Thread DmytroShkvyra
Github user DmytroShkvyra closed the pull request at:

https://github.com/apache/flink/pull/3502


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3502: [FLINK-4565] Support for SQL IN operator

2017-03-10 Thread DmytroShkvyra
GitHub user DmytroShkvyra reopened a pull request:

https://github.com/apache/flink/pull/3502

[FLINK-4565] Support for SQL IN operator

[FLINK-4565] Support for SQL IN operator
This PR is a part of work on SQL IN operator in Table API, which implements 
IN for literals.
Two cases are covered: less and great then 20 literals.

Also I have some questions:
- converting all numeric types to BigDecimal isn't ok? I decided to make so 
to simplify use of hashset.
- validation isn't really good. It forces to use operator with same type 
literals. Should I rework it or maybe just add more cases?
expressionDsl.scala:
entry point for IN operator in scala API
ScalarOperators.scala:
1) All numeric types are upcasting to BigDecimal for using in hashset, 
other types are unchanged in castNumeric
2) valuesInitialization used for 2 cases: when we have more then 20 
operands (then we use hashset, initialized in constructor, descibed below) and 
less then 20 operands (then we initialize operands in method's body and use 
them in conjunction)
3) comparison also covers described above cases. In first case we use 
callback to declare and initialize hashset with all operands. Otherwise we just 
put all operands in conjunction.
4) Final code is built up with these code snippets.
CodeGenerator.scala:
passes arguments and callback to declare and init hashet
FunctionCatalog.scala:
registers "in" as method
InITCase:
some use cases

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3502


commit f7960d66a0f885a5b032345427c5380f268cc60e
Author: DmytroShkvyra 
Date:   2017-03-09T19:37:46Z

[FLINK-4565] Support for SQL IN operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed

2017-03-10 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6019:
-

 Summary: Some log4j messages do not have a loglevel field set, so 
they can't be suppressed
 Key: FLINK-6019
 URL: https://issues.apache.org/jira/browse/FLINK-6019
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
 Environment: Linux
Reporter: Luke Hutchison


Some of the log messages do not appear to have a loglevel value set, so they 
can't be suppressed by setting the log4j level to WARN. There's this line at 
the beginning which doesn't even have a timestamp:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939]

And then there are numerous lines like this, missing an "INFO" field:

03/10/2017 00:01:14 Job execution switched to status RUNNING.
03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) 
(org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED 
03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED 
03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING 
03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING 
03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED 
03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) 
(org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED 
03/10/2017 00:01:17 Job execution switched to status FINISHED.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6000) Can not start HA cluster with start-cluster.sh

2017-03-10 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned FLINK-6000:
---

Assignee: Dawid Wysakowicz

> Can not start HA cluster with start-cluster.sh
> --
>
> Key: FLINK-6000
> URL: https://issues.apache.org/jira/browse/FLINK-6000
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Right know it is impossible to start a cluster in zookeeper HA mode as 
> described in the documentation by setting:
> in con/flink-conf.yaml:
> {code}
> high-availability: zookeeper
> ...
> {code}
> in conf/masters:
> {code}
> localhost:8081
> localhost:8082
> {code}
> The problem is with the {{bin/config.sh}} file. If value "zookeeper" is read 
> from config file the variable {{HIGH_AVAILABILITY}} will be reset to "none" 
> with the else branch. See the below code:
> {code}
> if [ -z "${HIGH_AVAILABILITY}" ]; then
>  HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" 
> "${YAML_CONF}")
>  if [ -z "${HIGH_AVAILABILITY}" ]; then
> # Try deprecated value
> DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
> if [ -z "${DEPRECATED_HA}" ]; then
> HIGH_AVAILABILITY="none"
> elif [ ${DEPRECATED_HA} == "standalone" ]; then
> # Standalone is now 'none'
> HIGH_AVAILABILITY="none"
> else
> HIGH_AVAILABILITY=${DEPRECATED_HA}
> fi
>  else
>  HIGH_AVAILABILITY="none" <-- it exits here
>  fi
> fi
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed

2017-03-10 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904680#comment-15904680
 ] 

Luke Hutchison commented on FLINK-6019:
---

Also, these messages do not respond to setting the logger to send to stderr 
rather than stdout, e.g. using

log4j.appender.stdout.Target=System.err

(other log messages do respond to this change to use stderre, and they display 
the correct loglevel, e.g. "INFO"/"WARN")

i.e. it seems that there are two logger systems running: the one that writes 
through log4j, respecting its settings, and whatever is writing the log entries 
above.

> Some log4j messages do not have a loglevel field set, so they can't be 
> suppressed
> -
>
> Key: FLINK-6019
> URL: https://issues.apache.org/jira/browse/FLINK-6019
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
> Environment: Linux
>Reporter: Luke Hutchison
>
> Some of the log messages do not appear to have a loglevel value set, so they 
> can't be suppressed by setting the log4j level to WARN. There's this line at 
> the beginning which doesn't even have a timestamp:
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939]
> And then there are numerous lines like this, missing an "INFO" field:
> 03/10/2017 00:01:14   Job execution switched to status RUNNING.
> 03/10/2017 00:01:14   DataSource (at readTable(DBTableReader.java:165) 
> (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED 
> 03/10/2017 00:01:14   DataSink (count())(1/8) switched to SCHEDULED 
> 03/10/2017 00:01:14   DataSink (count())(3/8) switched to DEPLOYING 
> 03/10/2017 00:01:15   DataSink (count())(3/8) switched to RUNNING 
> 03/10/2017 00:01:17   DataSink (count())(6/8) switched to FINISHED 
> 03/10/2017 00:01:17   DataSource (at readTable(DBTableReader.java:165) 
> (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED 
> 03/10/2017 00:01:17   Job execution switched to status FINISHED.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6000) Can not start HA cluster with start-cluster.sh

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904686#comment-15904686
 ] 

ASF GitHub Bot commented on FLINK-6000:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/3506

[FLINK-6000] Fix starting HA cluster with start-cluster.sh



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink flink6000

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3506


commit 1b94be9d77a784012aab852bc4e51bff1c793009
Author: Dawid Wysakowicz 
Date:   2017-03-10T08:15:06Z

[FLINK-6000] Fix starting HA cluster with start-cluster.sh




> Can not start HA cluster with start-cluster.sh
> --
>
> Key: FLINK-6000
> URL: https://issues.apache.org/jira/browse/FLINK-6000
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Right know it is impossible to start a cluster in zookeeper HA mode as 
> described in the documentation by setting:
> in con/flink-conf.yaml:
> {code}
> high-availability: zookeeper
> ...
> {code}
> in conf/masters:
> {code}
> localhost:8081
> localhost:8082
> {code}
> The problem is with the {{bin/config.sh}} file. If value "zookeeper" is read 
> from config file the variable {{HIGH_AVAILABILITY}} will be reset to "none" 
> with the else branch. See the below code:
> {code}
> if [ -z "${HIGH_AVAILABILITY}" ]; then
>  HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" 
> "${YAML_CONF}")
>  if [ -z "${HIGH_AVAILABILITY}" ]; then
> # Try deprecated value
> DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
> if [ -z "${DEPRECATED_HA}" ]; then
> HIGH_AVAILABILITY="none"
> elif [ ${DEPRECATED_HA} == "standalone" ]; then
> # Standalone is now 'none'
> HIGH_AVAILABILITY="none"
> else
> HIGH_AVAILABILITY=${DEPRECATED_HA}
> fi
>  else
>  HIGH_AVAILABILITY="none" <-- it exits here
>  fi
> fi
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3506: [FLINK-6000] Fix starting HA cluster with start-cl...

2017-03-10 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/3506

[FLINK-6000] Fix starting HA cluster with start-cluster.sh



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink flink6000

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3506


commit 1b94be9d77a784012aab852bc4e51bff1c793009
Author: Dawid Wysakowicz 
Date:   2017-03-10T08:15:06Z

[FLINK-6000] Fix starting HA cluster with start-cluster.sh




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3461: [FLINK-5954] Always assign names to the window in the Str...

2017-03-10 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3461
  
Thanks for the PR @haohui.
Looks good to me. +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904707#comment-15904707
 ] 

ASF GitHub Bot commented on FLINK-5954:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3461
  
Thanks for the PR @haohui.
Looks good to me. +1 to merge


> Always assign names to the window in the Stream SQL API
> ---
>
> Key: FLINK-5954
> URL: https://issues.apache.org/jira/browse/FLINK-5954
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, 
> {{SESSION}} grouped windows, as well as the corresponding auxiliary functions 
> that allow uses to query the start and the end of the windows (e.g., 
> {{TUMBLE_START()}} and {{TUMBLE_END()}} see 
> http://calcite.apache.org/docs/stream.html for more details).
> The goal of this jira is to add support for these auxiliary functions in 
> Flink. Flink already has runtime supports for them, as these functions are 
> essential mapped to the {{WindowStart}} and {{WindowEnd}} classes.
> To implement this feature in transformation, the transformation needs to 
> recognize these functions and map them to the {{WindowStart}} and 
> {{WindowEnd}} classes.
> The problem is that both classes can only refer to the windows using alias. 
> Therefore this jira proposes to assign a unique name for each window to 
> enable the transformation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly

2017-03-10 Thread Tao Wang (JIRA)
Tao Wang created FLINK-6020:
---

 Summary: Blob Server cannot hanlde multiple job sumits(with same 
content) parallelly
 Key: FLINK-6020
 URL: https://issues.apache.org/jira/browse/FLINK-6020
 Project: Flink
  Issue Type: Bug
Reporter: Tao Wang
Priority: Critical


In yarn-cluster mode, if we submit one same job multiple times parallelly, the 
task will encounter class load problem and lease occuputation.

Because blob server stores user jars in name with generated sha1sum of those, 
first writes a temp file and move it to finalialize. For recovery it also will 
put them to HDFS with same file name.

In same time, when multiple clients sumit same job with same jar, the local jar 
files in blob server and those file on hdfs will be handled in multiple 
threads(BlobServerConnection), and impact each other.

It's better to have a way to handle this, now two ideas comes up to my head:
1. lock the write operation, or
2. use some unique identifier as file name instead of ( or added up to) sha1sum 
of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3507: [FLINK-6006] [kafka] Always use complete restored ...

2017-03-10 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3507

[FLINK-6006] [kafka] Always use complete restored state in 
FlinkKafkaConsumer

(This PR is the fix of FLINK-6006 for Flink 1.1)

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This PR fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-6006-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3507.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3507


commit a57524bfb0158363be9a5bd4a6f18e053d96a030
Author: Tzu-Li (Gordon) Tai 
Date:   2017-03-10T06:47:57Z

[FLINK-6006] [kafka] Always use complete restored state in 
FlinkKafkaConsumer

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This commit fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904712#comment-15904712
 ] 

ASF GitHub Bot commented on FLINK-6006:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3507

[FLINK-6006] [kafka] Always use complete restored state in 
FlinkKafkaConsumer

(This PR is the fix of FLINK-6006 for Flink 1.1)

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This PR fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-6006-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3507.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3507


commit a57524bfb0158363be9a5bd4a6f18e053d96a030
Author: Tzu-Li (Gordon) Tai 
Date:   2017-03-10T06:47:57Z

[FLINK-6006] [kafka] Always use complete restored state in 
FlinkKafkaConsumer

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This commit fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.




> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-10 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3406
  
Hi @KurtYoung, you are right. Only `requiredProperties()` would be required 
to verify properties. 

I thought that the other two methods would be a good way to define the 
parameters of the converter. They could be used to print a usage message or 
details when the properties are not matched. We can also leave those out if you 
think that the implementation overhead does not correspond to the gains.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-03-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904724#comment-15904724
 ] 

Fabian Hueske commented on FLINK-5859:
--

Yes, the first changes should be attributed to [~tonycox]. 
Just put the your changes on top. 
Opening a new PR sounds good to me. Thanks, Fabian

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-10 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904779#comment-15904779
 ] 

Ufuk Celebi commented on FLINK-5985:


Thanks for taking the time to create the example, Gyula.

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105362292
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait ExternalCatalog {
+
+  /**
+* Adds table into external Catalog
+*
+* @param table  description of table which to create
+* @param ignoreIfExists whether to ignore operation if table already 
exists
--- End diff --

Good advice, I would change the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904797#comment-15904797
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3402
  
@greghogan you are right, I overlooked the adjusted tests!

This looks like a good fix, please merge!


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904793#comment-15904793
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3459
  
Hi @huawei-flink, 

most Maven modules in Flink are implemented in either Java or Scala. We 
have a few modules (like flink-runtime) which are mixed but didn't make good 
experience with this design. Also, there is a clear separation in these modules 
which components are implemented in Java and Scala. I think implementing the 
flink-table module in a mix of Java and Scala is not a good idea because it 
makes the maintenance more difficult than sticking to a single language. 

Regarding the squashing. I think the best approach is to create a new 
branch based on the current master and manually copying over those files that 
you would like to keep. Most of the code should be new files, so this should 
not be too much effort. By using the first OVER window as a blueprint I meant 
that you should try to follow a similar approach by using the same utils and a 
similar design. IMO, it makes sense if similar features share as much code as 
possible and follow the same design.

Best, Fabian


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904799#comment-15904799
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3443
  
Hi, I commented on your other PR (#3459) about mixing Java and Scala and 
how to squash the commits.

Thanks, Fabian


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3504: [FLINK-6010] Documentation: correct IntelliJ IDEA Plugins...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3504
  
Looks good, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6010) Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904802#comment-15904802
 ] 

ASF GitHub Bot commented on FLINK-6010:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3504
  
Looks good, merging this...


> Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala 
> plugin' section
> --
>
> Key: FLINK-6010
> URL: https://issues.apache.org/jira/browse/FLINK-6010
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Bowen Li
> Fix For: 1.2.1
>
>
> In 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/ide_setup.html#installing-the-scala-plugin,
>  how you should get to 'plugins' page in IntelliJ IDEA is wrong. This seems 
> to be describing a much older version of IntelliJ IDE.
> The correct path now is: IntelliJ IDEA -> Preferences -> Plugins -> Install 
> JetBrains plugin 
> I'll submit a PR to fix this



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105362031
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.io.IOException
+import java.lang.reflect.Modifier
+import java.net.URL
+import java.util.Properties
+
+import org.apache.flink.table.annotation.ExternalCatalogCompatible
+import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.util.InstantiationUtil
+import org.reflections.Reflections
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+/**
+  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
+  */
+object CatalogTableHelper {
+
+  // config file to specifier the scan package to search tableSources
+  // which is compatible with external catalog.
+  private val tableSourceConfigFileName = "externalCatalogTable.properties"
--- End diff --

Hi, fabian. I'm a little confused. Which method do you means?
1. we could add a pkgsToScan field to TableConfig, it's response of Users 
to specify the value of pkgsToScan field. such as 
tableConfig.setPkgsToScan("org.apache.flink.table.sources","org.apache.flink.streaming.connectors.kafka")?
Then users should know exactly which module every needed converter belongs 
to. 
2. We could add a converterConfigFileName in TableConfig, it's response of 
Users to specify the value of pkgsToScan field. such as 
tableConfig.setConverterConfigFileName("externalCatalogTable.properties")?
The way is a little strange because the file name is fixed based on the 
convention, may not changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105363598
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait ExternalCatalog {
+
+  /**
+* Adds table into external Catalog
+*
+* @param table  description of table which to create
+* @param ignoreIfExists whether to ignore operation if table already 
exists
+*/
+  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): 
Unit
--- End diff --

Hi, fabian. Even for the integration with Calcite, getTable(), 
getDataBase(), and listTables() are sufficient, however an complete 
ExternalCatalog should be responsible for CRUD operations on db/table. For some 
readonly ExternalCatalog, we could choose to not supported createX, dropX, 
andalterX methods , for example, throw UnsupportedOperationException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-10 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3459
  
Hi @huawei-flink, 

most Maven modules in Flink are implemented in either Java or Scala. We 
have a few modules (like flink-runtime) which are mixed but didn't make good 
experience with this design. Also, there is a clear separation in these modules 
which components are implemented in Java and Scala. I think implementing the 
flink-table module in a mix of Java and Scala is not a good idea because it 
makes the maintenance more difficult than sticking to a single language. 

Regarding the squashing. I think the best approach is to create a new 
branch based on the current master and manually copying over those files that 
you would like to keep. Most of the code should be new files, so this should 
not be too much effort. By using the first OVER window as a blueprint I meant 
that you should try to follow a similar approach by using the same utils and a 
similar design. IMO, it makes sense if similar features share as much code as 
possible and follow the same design.

Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3402: [FLINK-5890] [gelly] GatherSumApply broken when object re...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3402
  
@greghogan you are right, I overlooked the adjusted tests!

This looks like a good fix, please merge!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105363822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait ExternalCatalog {
+
+  /**
+* Adds table into external Catalog
+*
+* @param table  description of table which to create
+* @param ignoreIfExists whether to ignore operation if table already 
exists
+*/
+  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): 
Unit
+
+  /**
+* Deletes table from external Catalog
+*
+* @param dbNamedatabase name
+* @param tableName table name
+* @param ignoreIfNotExists whether to ignore operation if table not 
exist yet
+*/
+  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: 
Boolean): Unit
+
+  /**
+* Modifies an existing table in the external catalog
+*
+* @param table description of table which to modify
+* @param ignoreIfNotExists whether to ignore operation if table not 
exist yet
+*/
+  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): 
Unit
+
+  /**
+* Gets table from external Catalog
+*
+* @param dbNamedatabase name
+* @param tableName table name
+* @return table
+*/
+  def getTable(dbName: String, tableName: String): ExternalCatalogTable
--- End diff --

I would add more comments of the method. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904817#comment-15904817
 ] 

ASF GitHub Bot commented on FLINK-5995:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
Thanks for finding and fixing this, important fix!

It would be great if you could add a test for this, to make sure the bug is 
not accidentally re-introduced later by someone. You can follow the pattern of 
this test: 
`flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java`


> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3431: [FLINK-5910] [gelly] Framework for Gelly examples

2017-03-10 Thread vasia
Github user vasia commented on the issue:

https://github.com/apache/flink/pull/3431
  
Thanks! Then, it's good to go from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6005) unit test ArrayList initializations without initial size

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904818#comment-15904818
 ] 

ASF GitHub Bot commented on FLINK-6005:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3499
  
Looks good, merging...


> unit test ArrayList initializations without initial size
> 
>
> Key: FLINK-6005
> URL: https://issues.apache.org/jira/browse/FLINK-6005
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> I found some ArrayList initializations without a sensible initial size 
> although it is possible to select one. The following PR will show some cases 
> that I'd like to fix.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5910) Framework for Gelly examples

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904820#comment-15904820
 ] 

ASF GitHub Bot commented on FLINK-5910:
---

Github user vasia commented on the issue:

https://github.com/apache/flink/pull/3431
  
Thanks! Then, it's good to go from my side.


> Framework for Gelly examples
> 
>
> Key: FLINK-5910
> URL: https://issues.apache.org/jira/browse/FLINK-5910
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> Driver jobs are composed of an input, an algorithm, and an output. Create the 
> interfaces for inputs, algorithms, and outputs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3503: [FLINK-5995][checkpoints] fix Get a Exception when creati...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
Thanks for finding and fixing this, important fix!

It would be great if you could add a test for this, to make sure the bug is 
not accidentally re-introduced later by someone. You can follow the pattern of 
this test: 
`flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3487: [FLINK-5980] Expose max-parallelism value in RuntimeConte...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3487
  
I think this looks good now, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904826#comment-15904826
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3487
  
I think this looks good now, merging this...


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3499: [FLINK-6005] fix some ArrayList initializations without i...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3499
  
Looks good, merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3477: [Flink-3318] Add support for quantifiers to CEP's pattern...

2017-03-10 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/3477
  
@kl0u could you have a look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5998) Un-fat Hadoop from Flink fat jar

2017-03-10 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904837#comment-15904837
 ] 

Robert Metzger commented on FLINK-5998:
---

[~wheat9] Thanks a lot for assigning that issue to you.
If we are lucky, this change is relatively easy to do and requires just some 
changes on the shading / assembly plugin.
Please let me know if there's anything that's unclear so that we can discuss it.

> Un-fat Hadoop from Flink fat jar
> 
>
> Key: FLINK-5998
> URL: https://issues.apache.org/jira/browse/FLINK-5998
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Haohui Mai
>
> As a first step towards FLINK-2268, I would suggest to put all hadoop 
> dependencies into a jar separate from Flink's fat jar.
> This would allow users to put a custom Hadoop jar in there, or even deploy 
> Flink without a Hadoop fat jar at all in environments where Hadoop is 
> provided (EMR).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-10 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3443
  
Hi, I commented on your other PR (#3459) about mixing Java and Scala and 
how to squash the commits.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105368360
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I thought about this again. I think it doesn't hurt to have it because it 
catches the case when a `WindowAssigner` doesn't assign any windows. In that 
case an element is also "skipped" but it is not necessarily considered late. 
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904844#comment-15904844
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3508

[FLINK-5991] [state-backend, streaming] Expose Broadcast Operator State

This PR exposes broadcast operator state through the `CheckpointedFunction` 
interface, by adding broadcast state access methods to `OperatorStateStore`.

Since the functionality was already internally available and had test 
workarounds using casting, this PR simply removes those casts and properly 
tests the functionality through the `OperatorStateStore` interface.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-5991

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3508.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3508


commit 84e02d31f3e77af679728f544629becf2a857be9
Author: Tzu-Li (Gordon) Tai 
Date:   2017-03-10T09:22:22Z

[FLINK-5991] [state-backend, streaming] Expose Broadcast Operator State

This commit exposes broadcast operator state through the
`CheckpointedFunction` interface, by adding broadcast state access
methods to `OperatorStateStore`.




> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904842#comment-15904842
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105368360
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I thought about this again. I think it doesn't hurt to have it because it 
catches the case when a `WindowAssigner` doesn't assign any windows. In that 
case an element is also "skipped" but it is not necessarily considered late. 
What do you think?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...

2017-03-10 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3508
  
R: @aljoscha @StefanRRichter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3508: [FLINK-5991] [state-backend, streaming] Expose Bro...

2017-03-10 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3508

[FLINK-5991] [state-backend, streaming] Expose Broadcast Operator State

This PR exposes broadcast operator state through the `CheckpointedFunction` 
interface, by adding broadcast state access methods to `OperatorStateStore`.

Since the functionality was already internally available and had test 
workarounds using casting, this PR simply removes those casts and properly 
tests the functionality through the `OperatorStateStore` interface.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-5991

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3508.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3508


commit 84e02d31f3e77af679728f544629becf2a857be9
Author: Tzu-Li (Gordon) Tai 
Date:   2017-03-10T09:22:22Z

[FLINK-5991] [state-backend, streaming] Expose Broadcast Operator State

This commit exposes broadcast operator state through the
`CheckpointedFunction` interface, by adding broadcast state access
methods to `OperatorStateStore`.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904846#comment-15904846
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3508
  
R: @aljoscha @StefanRRichter


> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3485
  
Looks good, thanks! Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904857#comment-15904857
 ] 

ASF GitHub Bot commented on FLINK-5976:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3485
  
Looks good, thanks! Merging this...


> Refactoring duplicate Tokenizer in flink-test
> -
>
> Key: FLINK-5976
> URL: https://issues.apache.org/jira/browse/FLINK-5976
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.2.0
>Reporter: liuyuzhong7
>Priority: Minor
>  Labels: test
> Fix For: 1.2.0
>
>
> There are some duplicate code like this in flink-test, I think refactor this 
> will be better. 
> ```
> public final class Tokenizer implements FlatMapFunction Tuple2> {
>   @Override
>   public void flatMap(String value, Collector> 
> out) {
>   // normalize and split the line
>   String[] tokens = value.toLowerCase().split("\\W+");
>   // emit the pairs
>   for (String token : tokens) {
>   if (token.length() > 0) {
>   out.collect(new Tuple2(token, 
> 1));
>   }
>   }
>   }
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6021) Downloads page references "Hadoop 1 version" which isn't an option

2017-03-10 Thread Patrick Lucas (JIRA)
Patrick Lucas created FLINK-6021:


 Summary: Downloads page references "Hadoop 1 version" which isn't 
an option
 Key: FLINK-6021
 URL: https://issues.apache.org/jira/browse/FLINK-6021
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Patrick Lucas


The downloads pages says

{quote}
Apache Flink® 1.2.0 is our latest stable release.

You don’t have to install Hadoop to use Flink, but if you plan to use Flink 
with data stored in Hadoop, pick the version matching your installed Hadoop 
version. If you don’t want to do this, pick the Hadoop 1 version.
{quote}

But Hadoop 1 appears to no longer be an available alternative.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2905: [FLINK-5190] [runtime] fix ZooKeeperLeaderRetrievalServic...

2017-03-10 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2905
  
I fear that we cannot merge the PR as it is, because it would leave the 
`CuratorFramework` client open for some of the components used in the master. 
For example, the `WebMonitor` uses the same code path to open a leader 
retrieval service in case of HA mode.

In order to solve this problem, I think we have to change the master to 
also use the `HighAvailabilityServices`. Then we have a single point of 
responsibility to close the `CuratorFramework` client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5190) ZooKeeperLeaderRetrievalService should not close the zk client when stop

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904864#comment-15904864
 ] 

ASF GitHub Bot commented on FLINK-5190:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2905
  
I fear that we cannot merge the PR as it is, because it would leave the 
`CuratorFramework` client open for some of the components used in the master. 
For example, the `WebMonitor` uses the same code path to open a leader 
retrieval service in case of HA mode.

In order to solve this problem, I think we have to change the master to 
also use the `HighAvailabilityServices`. Then we have a single point of 
responsibility to close the `CuratorFramework` client.


> ZooKeeperLeaderRetrievalService should not close the zk client when stop
> 
>
> Key: FLINK-5190
> URL: https://issues.apache.org/jira/browse/FLINK-5190
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> The zk client is created outside of ZooKeeperLeaderRetrievalService and 
> psssed to it, when ZooKeeperLeaderRetrievalService stop, it should not stop 
> the zk client as other may be using it outside.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-10 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904869#comment-15904869
 ] 

Ufuk Celebi commented on FLINK-5985:


Thanks again for reporting this. You are right, we accidentally changed the 
behaviour between 1.1 and 1.2. This is a critical issue.

In 1.1 we did not serialize stateless tasks as part of the savepoint and 
therefore never took them into account on savepoint loading. With the recent 
refactorings in 1.2 we now serialize stateless tasks this as "zero length 
state". This makes the stateless operators part of the savepoint we try to load 
the state back to the new job.

I think there a couple of easy ways to fix this, but 
[~stefanrichte...@gmail.com] probably has the definitive answer here.

I think this issue warrants a 1.2.1 release asap as users cannot change their 
topologies if they didn't specify a UID for each operator.


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904873#comment-15904873
 ] 

ASF GitHub Bot commented on FLINK-5981:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3486#discussion_r105371699
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -55,6 +58,42 @@ public static boolean getSSLEnabled(Configuration 
sslConfig) {
}
 
/**
+* Sets SSl version and cipher suites for SSLServerSocket
+* @param socket
+*Socket to be handled
+* @param config
+*The application configuration
+*/
+   public static void setSSLVerAndCipherSuites(ServerSocket socket, 
Configuration config) {
+   if (socket instanceof SSLServerSocket) {
+   ((SSLServerSocket) 
socket).setEnabledProtocols(config.getString(
+   ConfigConstants.SECURITY_SSL_PROTOCOL,
+   
ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
--- End diff --

I think you are right, it is probably hard to do good verification here, 
and better to rely on the verification by the `SSLServerSocket`.


> SSL version and ciper suites cannot be constrained as configured
> 
>
> Key: FLINK-5981
> URL: https://issues.apache.org/jira/browse/FLINK-5981
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> I configured ssl and start flink job, but found configured properties cannot 
> apply properly:
> akka port: only ciper suites apply right, ssl version not
> blob server/netty server: both ssl version and ciper suites are not like what 
> I configured
> I've found out the reason why:
> http://stackoverflow.com/questions/11504173/sslcontext-initialization (for 
> blob server and netty server)
> https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl 
> version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078)
> I'll fix the issue on blob server and netty server, and it seems like only 
> upgrade for akka can solve issue in akka side(we'll consider later as upgrade 
> is not a small action).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904876#comment-15904876
 ] 

ASF GitHub Bot commented on FLINK-5981:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3486
  
I think this is a good fix.
Is it possible to add some form of test that makes sure the config is 
properly applied in all cases? What do you think about adding to the 
`SSLUtilsTest` class and a case in the `AkkaSslITCase` that validates that the 
correct protocols are chosen?


> SSL version and ciper suites cannot be constrained as configured
> 
>
> Key: FLINK-5981
> URL: https://issues.apache.org/jira/browse/FLINK-5981
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> I configured ssl and start flink job, but found configured properties cannot 
> apply properly:
> akka port: only ciper suites apply right, ssl version not
> blob server/netty server: both ssl version and ciper suites are not like what 
> I configured
> I've found out the reason why:
> http://stackoverflow.com/questions/11504173/sslcontext-initialization (for 
> blob server and netty server)
> https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl 
> version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078)
> I'll fix the issue on blob server and netty server, and it seems like only 
> upgrade for akka can solve issue in akka side(we'll consider later as upgrade 
> is not a small action).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105372676
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I just added a test for the behaviour with a "weird" `WindowAssigner`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904878#comment-15904878
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105372676
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I just added a test for the behaviour with a "weird" `WindowAssigner`. 


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3486: [FLINK-5981][SECURITY]make ssl version and cipher ...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3486#discussion_r105371699
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -55,6 +58,42 @@ public static boolean getSSLEnabled(Configuration 
sslConfig) {
}
 
/**
+* Sets SSl version and cipher suites for SSLServerSocket
+* @param socket
+*Socket to be handled
+* @param config
+*The application configuration
+*/
+   public static void setSSLVerAndCipherSuites(ServerSocket socket, 
Configuration config) {
+   if (socket instanceof SSLServerSocket) {
+   ((SSLServerSocket) 
socket).setEnabledProtocols(config.getString(
+   ConfigConstants.SECURITY_SSL_PROTOCOL,
+   
ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
--- End diff --

I think you are right, it is probably hard to do good verification here, 
and better to rely on the verification by the `SSLServerSocket`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3486: [FLINK-5981][SECURITY]make ssl version and cipher suites ...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3486
  
I think this is a good fix.
Is it possible to add some form of test that makes sure the config is 
properly applied in all cases? What do you think about adding to the 
`SSLUtilsTest` class and a case in the `AkkaSslITCase` that validates that the 
correct protocols are chosen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-10 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-6022:
-

 Summary: Improve support for Avro GenericRecord
 Key: FLINK-6022
 URL: https://issues.apache.org/jira/browse/FLINK-6022
 Project: Flink
  Issue Type: Improvement
  Components: Type Serialization System
Reporter: Robert Metzger


Currently, Flink is serializing the schema for each Avro GenericRecord in the 
stream.
This leads to a lot of overhead over the wire/disk + high serialization costs.

Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
shipping the schema to each serializer  through the AvroTypeInformation.
Then, we can only support GenericRecords with the same type per stream, but the 
performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...

2017-03-10 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3508
  
The PR re-introduces methods that have been removed from the public 
interface before the release. Reason the remove the methods was that we had too 
little time to make a final decision on the API. As this matches my suggestion, 
+1 from me. However, @aljoscha should also give his thumps up because I think 
is this is the point where we should catch up with our API discussion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904882#comment-15904882
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3508
  
The PR re-introduces methods that have been removed from the public 
interface before the release. Reason the remove the methods was that we had too 
little time to make a final decision on the API. As this matches my suggestion, 
+1 from me. However, @aljoscha should also give his thumps up because I think 
is this is the point where we should catch up with our API discussion.


> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3477: [Flink-3318] Add support for quantifiers to CEP's pattern...

2017-03-10 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3477
  
Hi @dawidwys , I will do this in the beginning of next week at the latest. 

One thing that I saw is that in the `Pattern` class you do not check for 
conflicting configurations e.g. oneToMany and zeroToMany. It would be nice to 
have that with a good error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-10 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904895#comment-15904895
 ] 

Stefan Richter commented on FLINK-5985:
---

I think the proper solution is based on preventing those empty files from being 
written, but there must be some differentiation between stateless tasks and 
stateful tasks with (currently) empty state. For the later ones, the empty 
state indicates restores which is an exposed signal for user code.

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-10 Thread Flavio Pompermaier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904899#comment-15904899
 ] 

Flavio Pompermaier commented on FLINK-6022:
---

HI [~rmetzger], does this apply also to Thrift and Protobuf?
Is it the case to open an issue also for them?

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105377617
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 ---
@@ -736,7 +736,7 @@ public void restoreState(List state) throws 
Exception {
 
static final ValueStateDescriptor descriptor = new 
ValueStateDescriptor<>("seen", Boolean.class, false);
private static final long serialVersionUID = 1L;
-   private ValueState operatorState;
+   private transient ValueState operatorState;
--- End diff --

true but it is wrong and someone has to fix it at some point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904915#comment-15904915
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105377617
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 ---
@@ -736,7 +736,7 @@ public void restoreState(List state) throws 
Exception {
 
static final ValueStateDescriptor descriptor = new 
ValueStateDescriptor<>("seen", Boolean.class, false);
private static final long serialVersionUID = 1L;
-   private ValueState operatorState;
+   private transient ValueState operatorState;
--- End diff --

true but it is wrong and someone has to fix it at some point.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904917#comment-15904917
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105377789
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
--- End diff --

I think it is worth having it also here for users of the method. The more 
the places that the user can find the required information, the better.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105377853
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
+* 
+* it is a POJO type but does not override the {@link 
#hashCode()} method and relies on
+* the {@link Object#hashCode()} implementation.
+* it is an array of any type (see {@link 
PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+* {@link ObjectArrayTypeInfo}).
+* ,
+* {@code true} otherwise.
+*/
+   private boolean validateKeyTypeIsHashable(TypeInformation type) {
+   try {
+   return (type instanceof PojoTypeInfo) ?
--- End diff --

Really? I am not sure about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105377789
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
--- End diff --

I think it is worth having it also here for users of the method. The more 
the places that the user can find the required information, the better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904918#comment-15904918
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105377853
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
+* 
+* it is a POJO type but does not override the {@link 
#hashCode()} method and relies on
+* the {@link Object#hashCode()} implementation.
+* it is an array of any type (see {@link 
PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+* {@link ObjectArrayTypeInfo}).
+* ,
+* {@code true} otherwise.
+*/
+   private boolean validateKeyTypeIsHashable(TypeInformation type) {
+   try {
+   return (type instanceof PojoTypeInfo) ?
--- End diff --

Really? I am not sure about it.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-10 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904920#comment-15904920
 ] 

Robert Metzger commented on FLINK-6022:
---

I think protobuf always works with generated serializers with a fixed schema 
(that's the common case for Avro as well), so I don't think there's a need to 
add support there.
For Thrift I don't know.

I haven't heard any complaints for Thrift and Protobuf. For Avro this issue has 
come up with at least two users.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105378808
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
--- End diff --

This is the proper way of doing it. A `try catch` is of course an option, 
but this is more concise, and built in JUnit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904928#comment-15904928
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105378808
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
--- End diff --

This is the proper way of doing it. A `try catch` is of course an option, 
but this is more concise, and built in JUnit. 


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904930#comment-15904930
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3480
  
Did another review - looks good to me!
Merging...


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3480: [FLINK-4545] use size-restricted LocalBufferPool instance...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3480
  
Did another review - looks good to me!
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105378932
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
--- End diff --

We can go for `Object[]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904931#comment-15904931
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105378932
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
--- End diff --

We can go for `Object[]`.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-10 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu resolved FLINK-5980.
-
Resolution: Fixed

> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904945#comment-15904945
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105380399
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements

[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105380399
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   input.keyBy(new KeySelector() {
+   @Override
+   public POJOwithHashCode getKey(POJOwithHashCode value) 
throws Except

[GitHub] flink issue #3218: [FLINK-5642][query] fix a race condition with HeadListSta...

2017-03-10 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3218
  
ok, let's close this PR as the issue is actually deeper than originally 
though and can only be fixed with a new heap state backend or by locking for 
queryable state queries as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5642) queryable state: race condition with HeadListState

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904953#comment-15904953
 ] 

ASF GitHub Bot commented on FLINK-5642:
---

Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/3218


> queryable state: race condition with HeadListState
> --
>
> Key: FLINK-5642
> URL: https://issues.apache.org/jira/browse/FLINK-5642
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> If queryable state accesses a HeapListState instance that is being modified 
> during the value's serialisation, it may crash, e.g. with a 
> NullPointerException during the serialisation or with an EOFException during 
> de-serialisation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3218: [FLINK-5642][query] fix a race condition with Head...

2017-03-10 Thread NicoK
Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/3218


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5642) queryable state: race condition with HeadListState

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904952#comment-15904952
 ] 

ASF GitHub Bot commented on FLINK-5642:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3218
  
ok, let's close this PR as the issue is actually deeper than originally 
though and can only be fixed with a new heap state backend or by locking for 
queryable state queries as well


> queryable state: race condition with HeadListState
> --
>
> Key: FLINK-5642
> URL: https://issues.apache.org/jira/browse/FLINK-5642
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> If queryable state accesses a HeapListState instance that is being modified 
> during the value's serialisation, it may crash, e.g. with a 
> NullPointerException during the serialisation or with an EOFException during 
> de-serialisation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105382341
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.io.IOException
+import java.lang.reflect.Modifier
+import java.net.URL
+import java.util.Properties
+
+import org.apache.flink.table.annotation.ExternalCatalogCompatible
+import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.util.InstantiationUtil
+import org.reflections.Reflections
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+/**
+  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
+  */
+object CatalogTableHelper {
+
+  // config file to specifier the scan package to search tableSources
+  // which is compatible with external catalog.
+  private val tableSourceConfigFileName = "externalCatalogTable.properties"
--- End diff --

Sorry, I think I confused something there. 
The approach to have a resource file in each Module / Jar that offers 
converters is good. Hard-coding the name of the resource file is fine.

Thinking this one step further, would it make sense to not use scanning at 
all but exactly specify the Converter classes? Then we would not need to use 
the reflection library to scan the class path. Just as an idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread Mauro Cortellazzi (JIRA)
Mauro Cortellazzi created FLINK-6023:


 Summary: Fix Scala snippet into Process Function (Low-level 
Operations) Doc
 Key: FLINK-6023
 URL: https://issues.apache.org/jira/browse/FLINK-6023
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Mauro Cortellazzi
Priority: Trivial
 Fix For: 1.3.0, 1.2.1


The current `/docs/dev/stream/process_function.md` has some errors in the Scala 
snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread Andrea Sella (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrea Sella reassigned FLINK-6023:
---

Assignee: Andrea Sella

> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Andrea Sella
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105384191
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.io.IOException
+import java.lang.reflect.Modifier
+import java.net.URL
+import java.util.Properties
+
+import org.apache.flink.table.annotation.ExternalCatalogCompatible
+import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.util.InstantiationUtil
+import org.reflections.Reflections
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+/**
+  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
+  */
+object CatalogTableHelper {
+
+  // config file to specifier the scan package to search tableSources
+  // which is compatible with external catalog.
+  private val tableSourceConfigFileName = "externalCatalogTable.properties"
--- End diff --

fabian,  register all needed Converter classes instead of use scanning can 
work, too. However  if somebody adds new tableSourceConverters , such as 
parquetTableSourceConverter or else, users need to change the code or config 
file to including the new added Converter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3466: [FLINK-5715] Asynchronous snapshots for heap-based...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3466#discussion_r105389424
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws 
IOException {
 *
 * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
 *  and the path to the checkpoint data 
directory.
+* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+*
+* @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+*/
+   public FsStateBackend(String checkpointDataUri, boolean 
asynchronousSnapshots) throws IOException {
--- End diff --

We are getting one more parameter into the constructors with the change 
makes the state backend handle all checkpoint/savepoint storage related 
business. That must be constructor parameter, so if we can avoid further 
constructor parameters, that would help. Otherwise we really end up with 20 
constructors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905007#comment-15905007
 ] 

ASF GitHub Bot commented on FLINK-5715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3466#discussion_r105389424
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws 
IOException {
 *
 * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
 *  and the path to the checkpoint data 
directory.
+* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+*
+* @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+*/
+   public FsStateBackend(String checkpointDataUri, boolean 
asynchronousSnapshots) throws IOException {
--- End diff --

We are getting one more parameter into the constructors with the change 
makes the state backend handle all checkpoint/savepoint storage related 
business. That must be constructor parameter, so if we can avoid further 
constructor parameters, that would help. Otherwise we really end up with 20 
constructors.


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---
>
> Key: FLINK-5715
> URL: https://issues.apache.org/jira/browse/FLINK-5715
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
> many user in productions. Their jobs can not tolerate stopped processing for 
> the time it takes to write gigabytes of data from memory to disk. 
> Asynchronous snapshots would be a solution to this problem. The challenge for 
> the implementation is coming up with a copy-on-write scheme for the in-memory 
> hash maps that build the foundation of this backend. After taking a closer 
> look, this problem is twofold. First, providing CoW semantics for the hashmap 
> itself, as a mutible structure, thereby avoiding costly locking or blocking 
> where possible. Second, CoW for the mutable value objects, e.g. through 
> cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905015#comment-15905015
 ] 

ASF GitHub Bot commented on FLINK-5715:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3466#discussion_r105390074
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws 
IOException {
 *
 * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
 *  and the path to the checkpoint data 
directory.
+* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+*
+* @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+*/
+   public FsStateBackend(String checkpointDataUri, boolean 
asynchronousSnapshots) throws IOException {
--- End diff --

I guess this is then typically a case where you could go for a builder 
pattern, only problem i see here that this would be API breaking. What do you 
think?


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---
>
> Key: FLINK-5715
> URL: https://issues.apache.org/jira/browse/FLINK-5715
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
> many user in productions. Their jobs can not tolerate stopped processing for 
> the time it takes to write gigabytes of data from memory to disk. 
> Asynchronous snapshots would be a solution to this problem. The challenge for 
> the implementation is coming up with a copy-on-write scheme for the in-memory 
> hash maps that build the foundation of this backend. After taking a closer 
> look, this problem is twofold. First, providing CoW semantics for the hashmap 
> itself, as a mutible structure, thereby avoiding costly locking or blocking 
> where possible. Second, CoW for the mutable value objects, e.g. through 
> cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3466: [FLINK-5715] Asynchronous snapshots for heap-based...

2017-03-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3466#discussion_r105390074
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws 
IOException {
 *
 * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
 *  and the path to the checkpoint data 
directory.
+* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+*
+* @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+*/
+   public FsStateBackend(String checkpointDataUri, boolean 
asynchronousSnapshots) throws IOException {
--- End diff --

I guess this is then typically a case where you could go for a builder 
pattern, only problem i see here that this would be API breaking. What do you 
think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3501: [FLINK-5874] Restrict key types in the DataStream API.

2017-03-10 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3501
  
Thanks for the thorough review @zentol . I integrated the comments that I 
agree with and commented on the rest. If you agree with the changes, I can 
merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905022#comment-15905022
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3501
  
Thanks for the thorough review @zentol . I integrated the comments that I 
agree with and commented on the rest. If you agree with the changes, I can 
merge it.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-10 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey @addisonj. 
Sure! You could perhaps review the changes and maybe see how to discard 
empty operator states if you are motivated. This is the only pending issue for 
this PR. thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905044#comment-15905044
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey @addisonj. 
Sure! You could perhaps review the changes and maybe see how to discard 
empty operator states if you are motivated. This is the only pending issue for 
this PR. thanks!


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3123) Allow setting custom start-offsets for the Kafka consumer

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905047#comment-15905047
 ] 

ASF GitHub Bot commented on FLINK-3123:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2687
  
Rebased on the latest Kafka consumer changes in `master`.


> Allow setting custom start-offsets for the Kafka consumer
> -
>
> Key: FLINK-3123
> URL: https://issues.apache.org/jira/browse/FLINK-3123
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.0.0
>
>
> Currently, the Kafka consumer only allows to start reading from the earliest 
> available offset or the current offset.
> Sometimes, users want to set a specific start offset themselves.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2687: [FLINK-3123] [kafka] Allow custom specific start offsets ...

2017-03-10 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2687
  
Rebased on the latest Kafka consumer changes in `master`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread Andrea Sella (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrea Sella reassigned FLINK-6023:
---

Assignee: (was: Andrea Sella)

> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3402: [FLINK-5890] [gelly] GatherSumApply broken when object re...

2017-03-10 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3402
  
@StephanEwen my last comment was ambiguous, I had originally modified a 
test and then with yesterday's commit reverted that change and added as a new 
test.

Will merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905072#comment-15905072
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3402
  
@StephanEwen my last comment was ambiguous, I had originally modified a 
test and then with yesterday's commit reverted that change and added as a new 
test.

Will merge.


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905074#comment-15905074
 ] 

ASF GitHub Bot commented on FLINK-5808:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3509

[FLINK-5808] Fix Missing verification for setParallelism and 
setMaxParallelism



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-5808-max-parallelism-verification

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3509


commit 59b049bbad2c889dcb29ffbd900a458580c20728
Author: Aljoscha Krettek 
Date:   2017-03-10T12:30:21Z

[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator

Before, it was set on the ExecutionConfig for some stream execution
environments and later for others. Now, we don't set the default
parallelism on the ExecutionConfig but instead set it at the latest
possible point, in the StreamingJobGraphGenerator.

This also adds tests that verify that we don't set the default
parallelism on the ExecutionConfig.

commit 5527eae6f29ed80baedd1f5feadf3a4e17ac7865
Author: Aljoscha Krettek 
Date:   2017-03-10T13:37:26Z

[FLINK-5808] Move max keygroup constants to ExecutionConfig

We need to have them there if we want to properly test the arguments of
setMaxParallelism() in the ExecutionConfig itself.

commit 096dcca374a052d90bfe09a5a147435f2614aa05
Author: Aljoscha Krettek 
Date:   2017-03-10T13:35:37Z

[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

Before, there where some checks in
StreamExecutionEnvironment.set(Max)Parallelism() but a user would
circumvent these if using the ExecutionConfig directly. Now, all checks
are moved to the ExecutionConfig.




> Missing verification for setParallelism and setMaxParallelism
> -
>
> Key: FLINK-5808
> URL: https://issues.apache.org/jira/browse/FLINK-5808
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> When {{setParallelism()}} is called we don't verify that it is <= than max 
> parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check 
> that the new value doesn't clash with a previously set parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3509: [FLINK-5808] Fix Missing verification for setParal...

2017-03-10 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3509

[FLINK-5808] Fix Missing verification for setParallelism and 
setMaxParallelism



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-5808-max-parallelism-verification

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3509


commit 59b049bbad2c889dcb29ffbd900a458580c20728
Author: Aljoscha Krettek 
Date:   2017-03-10T12:30:21Z

[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator

Before, it was set on the ExecutionConfig for some stream execution
environments and later for others. Now, we don't set the default
parallelism on the ExecutionConfig but instead set it at the latest
possible point, in the StreamingJobGraphGenerator.

This also adds tests that verify that we don't set the default
parallelism on the ExecutionConfig.

commit 5527eae6f29ed80baedd1f5feadf3a4e17ac7865
Author: Aljoscha Krettek 
Date:   2017-03-10T13:37:26Z

[FLINK-5808] Move max keygroup constants to ExecutionConfig

We need to have them there if we want to properly test the arguments of
setMaxParallelism() in the ExecutionConfig itself.

commit 096dcca374a052d90bfe09a5a147435f2614aa05
Author: Aljoscha Krettek 
Date:   2017-03-10T13:35:37Z

[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

Before, there where some checks in
StreamExecutionEnvironment.set(Max)Parallelism() but a user would
circumvent these if using the ExecutionConfig directly. Now, all checks
are moved to the ExecutionConfig.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >