[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

2016-09-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2440#discussion_r77956781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+   public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+   private KeyGroupRangeAssignment() {
+   throw new AssertionError();
+   }
+
+   /**
+* Assigns the given key to a parallel operator index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @param parallelism the current parallelism of the operator
+* @return the index of the parallel operator to which the given key 
should be routed.
+*/
+   public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
+   return computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, assignToKeyGroup(key, maxParallelism));
+   }
+
+   /**
+* Assigns the given key to a key-group index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @return the key-group to which the given key is assigned
+*/
+   public static final int assignToKeyGroup(Object key, int 
maxParallelism) {
+   return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+   }
+
+   /**
+* Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
+* parallelism.
+*
+* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+* to go beyond this boundary, this method must perform arithmetic on 
long values.
+*
+* @param maxParallelism Maximal parallelism that the job was initially 
created with.
+* @param parallelismThe current parallelism under which the job 
runs. Must be <= maxParallelism.
+* @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+* @return
+*/
+   public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+   int maxParallelism,
+   int parallelism,
+   int operatorIndex) {
+   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
+   Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
+   Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+   int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+   int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
--- End diff --

Ah I see. Now it makes sense :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473074#comment-15473074
 ] 

ASF GitHub Bot commented on FLINK-3755:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2440#discussion_r77956781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+   public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+   private KeyGroupRangeAssignment() {
+   throw new AssertionError();
+   }
+
+   /**
+* Assigns the given key to a parallel operator index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @param parallelism the current parallelism of the operator
+* @return the index of the parallel operator to which the given key 
should be routed.
+*/
+   public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
+   return computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, assignToKeyGroup(key, maxParallelism));
+   }
+
+   /**
+* Assigns the given key to a key-group index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @return the key-group to which the given key is assigned
+*/
+   public static final int assignToKeyGroup(Object key, int 
maxParallelism) {
+   return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+   }
+
+   /**
+* Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
+* parallelism.
+*
+* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+* to go beyond this boundary, this method must perform arithmetic on 
long values.
+*
+* @param maxParallelism Maximal parallelism that the job was initially 
created with.
+* @param parallelismThe current parallelism under which the job 
runs. Must be <= maxParallelism.
+* @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+* @return
+*/
+   public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+   int maxParallelism,
+   int parallelism,
+   int operatorIndex) {
+   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
+   Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
+   Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+   int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+   int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
--- End diff --

Ah I see. Now it makes sense :-)


> Introduce key groups for key-value state to support dynamic scaling
> ---
>
> Key: FLINK-3755
> URL: https://issues.apache.org/jira/browse/FLINK-3755
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> In order to support dynamic scaling, it is necessary to sub-parti

[jira] [Commented] (FLINK-4597) Improve Scalar Function section in Table API documentation

2016-09-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473128#comment-15473128
 ] 

Timo Walther commented on FLINK-4597:
-

I thought the same. I'm currently reworking the SQL documentation as part of 
FLINK-4549. So that all functions (including the implicit ones) are documented. 
I will use the syntax Calcite is using {{boolean1 OR boolean2}}, {{value1 <> 
value2}}. What do you think?

> Improve Scalar Function section in Table API documentation
> --
>
> Key: FLINK-4597
> URL: https://issues.apache.org/jira/browse/FLINK-4597
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Minor
>
> The function signature in Scalar Function section is a little confusing. 
> Because it's hard to distinguish keyword and parameters. Such as : 
> {{EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)}}, user may not know TEMPORAL is a 
> parameter after first glance. I propose to use {{<>}} around parameters, i.e. 
> {{EXTRACT( FROM )}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4598) Support NULLIF in Table API

2016-09-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473139#comment-15473139
 ] 

Timo Walther commented on FLINK-4598:
-

I wonder if we have to support all functions in the Table API that are 
supported in SQL. Because SQL has a very large amount of functions while the 
Table API currently has only the very basic ones. Is {{NULLIF}} a function that 
is needed very often? We can support it in the Table API, I'm just worried 
about the large amount of implicit functions.

> Support NULLIF  in Table API
> 
>
> Key: FLINK-4598
> URL: https://issues.apache.org/jira/browse/FLINK-4598
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> This could be a subtask of [FLINK-4549]. As Flink SQL has supported 
> {{NULLIF}} implicitly. We should support it in Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment

2016-09-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4599:
---

 Summary: Add 'explain()' also to StreamTableEnvironment
 Key: FLINK-4599
 URL: https://issues.apache.org/jira/browse/FLINK-4599
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


Currenlty, only the BatchTableEnvironment supports the {{explain}} command for 
tables. We should also support it for the StreamTableEnvironment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4597) Improve Scalar Function section in Table API documentation

2016-09-08 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473160#comment-15473160
 ] 

Jark Wu commented on FLINK-4597:


I have a look at Calcite documentation [1] , this syntax is concise and clear. 
+1 for this and I will close this issue.

[1] https://calcite.apache.org/docs/reference.html#comparison-operators

> Improve Scalar Function section in Table API documentation
> --
>
> Key: FLINK-4597
> URL: https://issues.apache.org/jira/browse/FLINK-4597
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Minor
>
> The function signature in Scalar Function section is a little confusing. 
> Because it's hard to distinguish keyword and parameters. Such as : 
> {{EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)}}, user may not know TEMPORAL is a 
> parameter after first glance. I propose to use {{<>}} around parameters, i.e. 
> {{EXTRACT( FROM )}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-616) Change of java version without effect.

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473165#comment-15473165
 ] 

ASF GitHub Bot commented on FLINK-616:
--

Github user aljoscha closed the pull request at:

https://github.com/apache/incubator-beam/pull/921


> Change of java version without effect.
> --
>
> Key: FLINK-616
> URL: https://issues.apache.org/jira/browse/FLINK-616
> Project: Flink
>  Issue Type: Bug
>Reporter: GitHub Import
>  Labels: github-import
> Fix For: pre-apache
>
> Attachments: pull-request-616-8025446799688161987.patch
>
>
> The compiler plugin was configured under the wrong section. It used to
> be a child of reporting but is now a child of build. Changing the source
> and target properties in the old pom were without any effect. They were
> simply ignored. 
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/pull/616
> Created by: [carabolic|https://github.com/carabolic]
> Labels: 
> Created at: Thu Mar 20 13:49:44 CET 2014
> State: closed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4597) Improve Scalar Function section in Table API documentation

2016-09-08 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-4597.
--
Resolution: Duplicate

> Improve Scalar Function section in Table API documentation
> --
>
> Key: FLINK-4597
> URL: https://issues.apache.org/jira/browse/FLINK-4597
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Minor
>
> The function signature in Scalar Function section is a little confusing. 
> Because it's hard to distinguish keyword and parameters. Such as : 
> {{EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)}}, user may not know TEMPORAL is a 
> parameter after first glance. I propose to use {{<>}} around parameters, i.e. 
> {{EXTRACT( FROM )}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-08 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Rebasing on the latest master and letting Travis run. If it gives green 
light, I will merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473197#comment-15473197
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Rebasing on the latest master and letting Travis run. If it gives green 
light, I will merge the PR.


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4598) Support NULLIF in Table API

2016-09-08 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473200#comment-15473200
 ] 

Jark Wu commented on FLINK-4598:


You are right. We should evaluate what functions should be supported in Table 
API (functions user asked or used often).  It seems that {{NULLIF}} is a 
little-know function, I can close this issue if users rarely use it.

> Support NULLIF  in Table API
> 
>
> Key: FLINK-4598
> URL: https://issues.apache.org/jira/browse/FLINK-4598
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> This could be a subtask of [FLINK-4549]. As Flink SQL has supported 
> {{NULLIF}} implicitly. We should support it in Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473234#comment-15473234
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2456
  
I will rebase the PR on the latest master and if Travis gives green light, 
I'd like to merge it.


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...

2016-09-08 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2456
  
I will rebase the PR on the latest master and if Travis gives green light, 
I'd like to merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4442) Implement Standalone ResourceManager

2016-09-08 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-4442:
-

Assignee: Maximilian Michels

> Implement Standalone ResourceManager 
> -
>
> Key: FLINK-4442
> URL: https://issues.apache.org/jira/browse/FLINK-4442
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> implement ResouceManager rusn in Standalone mode



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2102: [FLINK-4068] [tableAPI] Move constant computations out of...

2016-09-08 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2102
  
As I mentioned above, the `cannot translate call AS($t0, $t1)` error 
because of a Calcite bug. The 
[CALCITE-1297](https://issues.apache.org/jira/browse/CALCITE-1297) fixed it , 
but not yet released.  I will wait for Calcite 1.9 released and start working 
on it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473300#comment-15473300
 ] 

ASF GitHub Bot commented on FLINK-4068:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2102
  
As I mentioned above, the `cannot translate call AS($t0, $t1)` error 
because of a Calcite bug. The 
[CALCITE-1297](https://issues.apache.org/jira/browse/CALCITE-1297) fixed it , 
but not yet released.  I will wait for Calcite 1.9 released and start working 
on it. 


> Move constant computations out of code-generated `flatMap` functions.
> -
>
> Key: FLINK-4068
> URL: https://issues.apache.org/jira/browse/FLINK-4068
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The generated functions for expressions of the Table API or SQL include 
> constant computations.
> For instance the code generated for a predicate like:
> {code}
> myInt < (10 + 20)
> {code}
> looks roughly like:
> {code}
> public void flatMap(Row in, Collector out) {
>   Integer in1 = in.productElement(1);
>   int temp = 10 + 20;  
>   if (in1 < temp) {
> out.collect(in)
>   }
> }
> {code}
> In this example the computation of {{temp}} is constant and could be moved 
> out of the {{flatMap()}} method.
> The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

2016-09-08 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2472#discussion_r77970498
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Promise;
+
+/**
+ * Implementation of {@link CompletableFuture} which is backed by {@link 
Promise}.
+ *
+ * @param  type of the future's value
+ */
+public class FlinkCompletableFuture extends FlinkFuture implements 
CompletableFuture {
+
+   private final Promise promise;
+
+   public FlinkCompletableFuture() {
+   promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+   scalaFuture = promise.future();
+
--- End diff --

unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4361) Introduce Flink's own future abstraction

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473318#comment-15473318
 ] 

ASF GitHub Bot commented on FLINK-4361:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2472#discussion_r77970498
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Promise;
+
+/**
+ * Implementation of {@link CompletableFuture} which is backed by {@link 
Promise}.
+ *
+ * @param  type of the future's value
+ */
+public class FlinkCompletableFuture extends FlinkFuture implements 
CompletableFuture {
+
+   private final Promise promise;
+
+   public FlinkCompletableFuture() {
+   promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+   scalaFuture = promise.future();
+
--- End diff --

unnecessary new line


> Introduce Flink's own future abstraction
> 
>
> Key: FLINK-4361
> URL: https://issues.apache.org/jira/browse/FLINK-4361
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>
> In order to keep the abstraction Scala Independent, we should not rely on 
> Scala Futures



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-08 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77970827
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Good point. Will use that one instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473326#comment-15473326
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77970827
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Good point. Will use that one instead.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

2016-09-08 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2472#discussion_r77971053
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Promise;
+
+/**
+ * Implementation of {@link CompletableFuture} which is backed by {@link 
Promise}.
+ *
+ * @param  type of the future's value
+ */
+public class FlinkCompletableFuture extends FlinkFuture implements 
CompletableFuture {
--- End diff --

How about letting this class overrides the cancel() methods and treat it as 
completeExceptionally with CancellationException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4361) Introduce Flink's own future abstraction

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473330#comment-15473330
 ] 

ASF GitHub Bot commented on FLINK-4361:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2472#discussion_r77971053
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Promise;
+
+/**
+ * Implementation of {@link CompletableFuture} which is backed by {@link 
Promise}.
+ *
+ * @param  type of the future's value
+ */
+public class FlinkCompletableFuture extends FlinkFuture implements 
CompletableFuture {
--- End diff --

How about letting this class overrides the cancel() methods and treat it as 
completeExceptionally with CancellationException


> Introduce Flink's own future abstraction
> 
>
> Key: FLINK-4361
> URL: https://issues.apache.org/jira/browse/FLINK-4361
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>
> In order to keep the abstraction Scala Independent, we should not rely on 
> Scala Futures



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2363
  
I'll address the checkNotNull/comment formatting while merging, which I'm 
doing now. Thank you for looking over it again @tillrohrmann .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473381#comment-15473381
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2363
  
I'll address the checkNotNull/comment formatting while merging, which I'm 
doing now. Thank you for looking over it again @tillrohrmann .


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4600) Support min/max aggregations for Date/Time/Timestamp/Intervals

2016-09-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4600:
---

 Summary: Support min/max aggregations for 
Date/Time/Timestamp/Intervals
 Key: FLINK-4600
 URL: https://issues.apache.org/jira/browse/FLINK-4600
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


Currently no aggregation supports temporal types. At least min/max should be 
added for Date/Time/Timestamp/Intervals.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2478: [FLINK-4595] Close FileOutputStream in ParameterTool

2016-09-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2478
  
+1, will merge it later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4595) Close FileOutputStream in ParameterTool

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473398#comment-15473398
 ] 

ASF GitHub Bot commented on FLINK-4595:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2478
  
+1, will merge it later on.


> Close FileOutputStream in ParameterTool
> ---
>
> Key: FLINK-4595
> URL: https://issues.apache.org/jira/browse/FLINK-4595
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> ParameterTool and ParameterToolTest do not close FileOutputStream
> {code}
> defaultProps.store(new FileOutputStream(file), "Default file created by 
> Flink's ParameterUtil.createPropertiesFile()");
> {code}
> {code}
> props.store(new FileOutputStream(propertiesFile), "Test properties");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4600) Support min/max aggregations for Date/Time/Timestamp/Intervals

2016-09-08 Thread miaoever (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

miaoever reassigned FLINK-4600:
---

Assignee: miaoever

> Support min/max aggregations for Date/Time/Timestamp/Intervals
> --
>
> Key: FLINK-4600
> URL: https://issues.apache.org/jira/browse/FLINK-4600
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: miaoever
>
> Currently no aggregation supports temporal types. At least min/max should be 
> added for Date/Time/Timestamp/Intervals.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4600) Support min/max aggregations for Date/Time/Timestamp/Intervals

2016-09-08 Thread miaoever (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473407#comment-15473407
 ] 

miaoever commented on FLINK-4600:
-

May I have a try? (and any advise cuz I am new to Flink) :) 

> Support min/max aggregations for Date/Time/Timestamp/Intervals
> --
>
> Key: FLINK-4600
> URL: https://issues.apache.org/jira/browse/FLINK-4600
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: miaoever
>
> Currently no aggregation supports temporal types. At least min/max should be 
> added for Date/Time/Timestamp/Intervals.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4260) Allow SQL's LIKE ESCAPE

2016-09-08 Thread Leo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473414#comment-15473414
 ] 

Leo Deng commented on FLINK-4260:
-

Yeah, I am working on this now (sorry for some delay since some of my personal 
issues.)

> Allow SQL's LIKE ESCAPE
> ---
>
> Key: FLINK-4260
> URL: https://issues.apache.org/jira/browse/FLINK-4260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Leo Deng
>Priority: Minor
>
> Currently, the SQL API does not support specifying an ESCAPE character in a 
> LIKE expression. The SIMILAR TO should also support that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4260) Allow SQL's LIKE ESCAPE

2016-09-08 Thread Leo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473414#comment-15473414
 ] 

Leo Deng edited comment on FLINK-4260 at 9/8/16 9:52 AM:
-

[~twalthr] Yeah, I am still working on this (sorry for the delay since  some of 
my personal issues.)


was (Author: miaoever):
Yeah, I am working on this now (sorry for some delay since some of my personal 
issues.)

> Allow SQL's LIKE ESCAPE
> ---
>
> Key: FLINK-4260
> URL: https://issues.apache.org/jira/browse/FLINK-4260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Leo Deng
>Priority: Minor
>
> Currently, the SQL API does not support specifying an ESCAPE character in a 
> LIKE expression. The SIMILAR TO should also support that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2402: [FLINK-4436] Unclosed DataOutputBuffer in Utils#setTokens...

2016-09-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2402
  
merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4436) Unclosed DataOutputBuffer in Utils#setTokensFor()

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473442#comment-15473442
 ] 

ASF GitHub Bot commented on FLINK-4436:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2402
  
merging


> Unclosed DataOutputBuffer in Utils#setTokensFor()
> -
>
> Key: FLINK-4436
> URL: https://issues.apache.org/jira/browse/FLINK-4436
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataOutputBuffer dob = new DataOutputBuffer();
> credentials.writeTokenStorageToStream(dob);
> {code}
> dob should be closed upon returning from the method.
> YarnApplicationMasterRunner#createTaskManagerContext() has similar issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4600) Support min/max aggregations for Date/Time/Timestamp/Intervals

2016-09-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473447#comment-15473447
 ] 

Timo Walther commented on FLINK-4600:
-

Go for it :) 
This issue requires some changes in 
"org.apache.flink.api.table.runtime.aggregate.AggregateUtil#transformToAggregateFunctions",
 "org.apache.flink.api.table.runtime.aggregate.MaxAggragate.scala" and tests. 
The intermediate results might work on primitive types to reduce object 
creations. 

> Support min/max aggregations for Date/Time/Timestamp/Intervals
> --
>
> Key: FLINK-4600
> URL: https://issues.apache.org/jira/browse/FLINK-4600
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Leo Deng
>
> Currently no aggregation supports temporal types. At least min/max should be 
> added for Date/Time/Timestamp/Intervals.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2402: [FLINK-4436] Unclosed DataOutputBuffer in Utils#se...

2016-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2402


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2478: [FLINK-4595] Close FileOutputStream in ParameterTo...

2016-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2478


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4595) Close FileOutputStream in ParameterTool

2016-09-08 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4595.
---
Resolution: Fixed

Fixed in 920cda408f61a27f0b1731325b62bf141dd4b530

> Close FileOutputStream in ParameterTool
> ---
>
> Key: FLINK-4595
> URL: https://issues.apache.org/jira/browse/FLINK-4595
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> ParameterTool and ParameterToolTest do not close FileOutputStream
> {code}
> defaultProps.store(new FileOutputStream(file), "Default file created by 
> Flink's ParameterUtil.createPropertiesFile()");
> {code}
> {code}
> props.store(new FileOutputStream(propertiesFile), "Test properties");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4436) Unclosed DataOutputBuffer in Utils#setTokensFor()

2016-09-08 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4436.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 7e07bde8c5b33e9985260416fdf75e15df102efa

> Unclosed DataOutputBuffer in Utils#setTokensFor()
> -
>
> Key: FLINK-4436
> URL: https://issues.apache.org/jira/browse/FLINK-4436
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
> Fix For: 1.2.0
>
>
> {code}
> DataOutputBuffer dob = new DataOutputBuffer();
> credentials.writeTokenStorageToStream(dob);
> {code}
> dob should be closed upon returning from the method.
> YarnApplicationMasterRunner#createTaskManagerContext() has similar issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4595) Close FileOutputStream in ParameterTool

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473456#comment-15473456
 ] 

ASF GitHub Bot commented on FLINK-4595:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2478


> Close FileOutputStream in ParameterTool
> ---
>
> Key: FLINK-4595
> URL: https://issues.apache.org/jira/browse/FLINK-4595
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> ParameterTool and ParameterToolTest do not close FileOutputStream
> {code}
> defaultProps.store(new FileOutputStream(file), "Default file created by 
> Flink's ParameterUtil.createPropertiesFile()");
> {code}
> {code}
> props.store(new FileOutputStream(propertiesFile), "Test properties");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4436) Unclosed DataOutputBuffer in Utils#setTokensFor()

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473455#comment-15473455
 ] 

ASF GitHub Bot commented on FLINK-4436:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2402


> Unclosed DataOutputBuffer in Utils#setTokensFor()
> -
>
> Key: FLINK-4436
> URL: https://issues.apache.org/jira/browse/FLINK-4436
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
> Fix For: 1.2.0
>
>
> {code}
> DataOutputBuffer dob = new DataOutputBuffer();
> credentials.writeTokenStorageToStream(dob);
> {code}
> dob should be closed upon returning from the method.
> YarnApplicationMasterRunner#createTaskManagerContext() has similar issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...

2016-09-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77981435
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Base class for implementing a type information factory. A type 
information factory allows for
+ * plugging-in user-defined {@link TypeInformation} into the Flink type 
system. The factory is
+ * called during the type extraction phase if the corresponding type has 
been annotated with
+ * {@link TypeInfo}. In a hierarchy of types the closest factory will be 
chosen while traversing
+ * upwards, however, a globally registered factory has highest precedence
+ * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ *
+ * @param  type for which {@link TypeInformation} is created
+ */
+@Public
+public abstract class TypeInfoFactory {
+
+   public TypeInfoFactory() {
+   // default constructor
--- End diff --

There is no reason for it. I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473502#comment-15473502
 ] 

ASF GitHub Bot commented on FLINK-3042:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77981435
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Base class for implementing a type information factory. A type 
information factory allows for
+ * plugging-in user-defined {@link TypeInformation} into the Flink type 
system. The factory is
+ * called during the type extraction phase if the corresponding type has 
been annotated with
+ * {@link TypeInfo}. In a hierarchy of types the closest factory will be 
chosen while traversing
+ * upwards, however, a globally registered factory has highest precedence
+ * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ *
+ * @param  type for which {@link TypeInformation} is created
+ */
+@Public
+public abstract class TypeInfoFactory {
+
+   public TypeInfoFactory() {
+   // default constructor
--- End diff --

There is no reason for it. I will remove it.


> Define a way to let types create their own TypeInformation
> --
>
> Key: FLINK-3042
> URL: https://issues.apache.org/jira/browse/FLINK-3042
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Timo Walther
> Fix For: 1.0.0
>
>
> Currently, introducing new Types that should have specific TypeInformation 
> requires
>   - Either integration with the TypeExtractor
>   - Or manually constructing the TypeInformation (potentially at every place) 
> and using type hints everywhere.
> I propose to add a way to allow classes to create their own TypeInformation 
> (like a static method "createTypeInfo()").
> To support generic nested types (like Optional / Either), the type extractor 
> would provide a Map of what generic variables map to what types (deduced from 
> the input). The class can use that to create the correct nested 
> TypeInformation (possibly by calling the TypeExtractor again, passing the Map 
> of generic bindings).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-09-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473506#comment-15473506
 ] 

Lorenz Bühmann commented on FLINK-2662:
---

No problem, just let me know if you need something more [~fhueske]

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FlinkBug.scala
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77983346
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -455,6 +459,33 @@ public boolean isForceAvroEnabled() {
return forceAvro;
}
 
+/**
+ * Force Flink to use the generated serializers for POJOs.
--- End diff --

Update comment to include comparators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473528#comment-15473528
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77983346
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -455,6 +459,33 @@ public boolean isForceAvroEnabled() {
return forceAvro;
}
 
+/**
+ * Force Flink to use the generated serializers for POJOs.
--- End diff --

Update comment to include comparators.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...

2016-09-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77984002
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -792,12 +832,40 @@ else if (t instanceof Class) {
 
return null;
}
-   
+
+   @SuppressWarnings({"unchecked", "rawtypes"})
private  TypeInformation 
createTypeInfoFromInput(TypeVariable returnTypeVar, ArrayList 
inputTypeHierarchy, Type inType, TypeInformation inTypeInfo) {
TypeInformation info = null;
-   
+
+   // use a factory to find corresponding type information to type 
variable
+   final ArrayList factoryHierarchy = new 
ArrayList<>(inputTypeHierarchy);
+   final TypeInfoFactory factory = 
getClosestFactory(factoryHierarchy, inType);
+   if (factory != null) {
+   // the type that defines the factory is last in factory 
hierarchy
+   final Type factoryDefiningType = 
factoryHierarchy.get(factoryHierarchy.size() - 1);
+   // defining type has generics, the factory need to be 
asked for a mapping of subtypes to type information
+   if (factoryDefiningType instanceof ParameterizedType) {
--- End diff --

Yes, because we try to create type information from the input in this 
method. This only works if a type variable connects input with output. 
`Map, OutType>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473535#comment-15473535
 ] 

ASF GitHub Bot commented on FLINK-3042:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77984002
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -792,12 +832,40 @@ else if (t instanceof Class) {
 
return null;
}
-   
+
+   @SuppressWarnings({"unchecked", "rawtypes"})
private  TypeInformation 
createTypeInfoFromInput(TypeVariable returnTypeVar, ArrayList 
inputTypeHierarchy, Type inType, TypeInformation inTypeInfo) {
TypeInformation info = null;
-   
+
+   // use a factory to find corresponding type information to type 
variable
+   final ArrayList factoryHierarchy = new 
ArrayList<>(inputTypeHierarchy);
+   final TypeInfoFactory factory = 
getClosestFactory(factoryHierarchy, inType);
+   if (factory != null) {
+   // the type that defines the factory is last in factory 
hierarchy
+   final Type factoryDefiningType = 
factoryHierarchy.get(factoryHierarchy.size() - 1);
+   // defining type has generics, the factory need to be 
asked for a mapping of subtypes to type information
+   if (factoryDefiningType instanceof ParameterizedType) {
--- End diff --

Yes, because we try to create type information from the input in this 
method. This only works if a type variable connects input with output. 
`Map, OutType>`


> Define a way to let types create their own TypeInformation
> --
>
> Key: FLINK-3042
> URL: https://issues.apache.org/jira/browse/FLINK-3042
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Timo Walther
> Fix For: 1.0.0
>
>
> Currently, introducing new Types that should have specific TypeInformation 
> requires
>   - Either integration with the TypeExtractor
>   - Or manually constructing the TypeInformation (potentially at every place) 
> and using type hints everywhere.
> I propose to add a way to allow classes to create their own TypeInformation 
> (like a static method "createTypeInfo()").
> To support generic nested types (like Optional / Either), the type extractor 
> would provide a Map of what generic variables map to what types (deduced from 
> the input). The class can use that to create the correct nested 
> TypeInformation (possibly by calling the TypeExtractor again, passing the Map 
> of generic bindings).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77984130
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -455,6 +459,33 @@ public boolean isForceAvroEnabled() {
return forceAvro;
}
 
+/**
+ * Force Flink to use the generated serializers for POJOs.
+ */
+   public void enableCodeGeneration() {
+   forceCodeGeneration = true;
+   }
+
+   public void disableCodeGeneration() {
+   forceCodeGeneration = false;
+   }
+
+   public boolean isCodeGenerationEnabled() {
+   return forceCodeGeneration;
+   }
+
+   public void enableWrapGeneratedClasses() {
--- End diff --

I'm not sure if this is still needed, but I might be missing something. 
Shouldn't this always be enabled?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473537#comment-15473537
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77984130
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -455,6 +459,33 @@ public boolean isForceAvroEnabled() {
return forceAvro;
}
 
+/**
+ * Force Flink to use the generated serializers for POJOs.
+ */
+   public void enableCodeGeneration() {
+   forceCodeGeneration = true;
+   }
+
+   public void disableCodeGeneration() {
+   forceCodeGeneration = false;
+   }
+
+   public boolean isCodeGenerationEnabled() {
+   return forceCodeGeneration;
+   }
+
+   public void enableWrapGeneratedClasses() {
--- End diff --

I'm not sure if this is still needed, but I might be missing something. 
Shouldn't this always be enabled?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77986502
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
--- End diff --

I think the wording "turned on" is a bit confusing, because it means 
different things for the different elements of this list.
- I guess for Kryo, Avro, and Generater, you mean `enableForceKryo`, 
`enableForceAvro`, `enableCodeGeneration` was called?
- For "Custom", you mean that this method was called for the particular 
type.
- For "Flink", you are talking about `PojoSerializer`? This is kind of 
always turned on. However, it is not always applicable (if the type is not a 
POJO), in which case we fall back to Kryo, even if it is not "turned on" in the 
above sense, which is also confusing.

Another problem is that I'm not sure what happens with Tuples (and basic 
types and other special types) if `enableForceKryo` was called. I guess they 
don't participate in this mess and always get serialized by their special 
built-in serializers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473565#comment-15473565
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77986502
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
--- End diff --

I think the wording "turned on" is a bit confusing, because it means 
different things for the different elements of this list.
- I guess for Kryo, Avro, and Generater, you mean `enableForceKryo`, 
`enableForceAvro`, `enableCodeGeneration` was called?
- For "Custom", you mean that this method was called for the particular 
type.
- For "Flink", you are talking about `PojoSerializer`? This is kind of 
always turned on. However, it is not always applicable (if the type is not a 
POJO), in which case we fall back to Kryo, even if it is not "turned on" in the 
above sense, which is also confusing.

Another problem is that I'm not sure what happens with Tuples (and basic 
types and other special types) if `enableForceKryo` was called. I guess they 
don't participate in this mess and always get serialized by their special 
built-in serializers?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77986671
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
+   Constructor[] ctors = ser.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
+   customSerializers.put(clazz, ser);
+   }
+
+   /**
+* Register a custom comparator for a type. The precedence of the 
serializers
--- End diff --

"serializers" -- You mean comparators?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473567#comment-15473567
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77986671
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
+   Constructor[] ctors = ser.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
+   customSerializers.put(clazz, ser);
+   }
+
+   /**
+* Register a custom comparator for a type. The precedence of the 
serializers
--- End diff --

"serializers" -- You mean comparators?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987079
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
+   Constructor[] ctors = ser.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
--- End diff --

Is it documented somewhere that custom serializers have to have these 
properties?

Also, these shouldn't be asserts, but throw exceptions instead. (I think 
asserts are generally for internal consistency stuff, i.e., they should fire 
only when you have a bug, and not when some stuff that a user gave us is not in 
the right form.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473575#comment-15473575
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987122
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
+   Constructor[] ctors = ser.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
+   customSerializers.put(clazz, ser);
+   }
+
+   /**
+* Register a custom comparator for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Custom, Generated, Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static  void 
registerCustomComparator(ArrayList keyIds,
+   
Class 
clazz, Class comp) {
+   Constructor[] ctors = comp.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
--- End diff --

Same problems as above.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987122
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
+   Constructor[] ctors = ser.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
+   customSerializers.put(clazz, ser);
+   }
+
+   /**
+* Register a custom comparator for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Custom, Generated, Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static  void 
registerCustomComparator(ArrayList keyIds,
+   
Class 
clazz, Class comp) {
+   Constructor[] ctors = comp.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
--- End diff --

Same problems as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473574#comment-15473574
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987079
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
+   Constructor[] ctors = ser.getConstructors();
+   assert ctors.length == 1;
+   assert ctors[0].getParameterTypes().length == 0;
--- End diff --

Is it documented somewhere that custom serializers have to have these 
properties?

Also, these shouldn't be asserts, but throw exceptions instead. (I think 
asserts are generally for internal consistency stuff, i.e., they should fire 
only when you have a bug, and not when some stuff that a user gave us is not in 
the right form.)


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987318
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
--- End diff --

Most of the other methods in this class are PublicEvolving. Maybe we should 
add this annotation here as well. (And also below.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473580#comment-15473580
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987318
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
+*
+*/
+   public static > void 
registerCustomSerializer(Class clazz, Class ser) {
--- End diff --

Most of the other methods in this class are PublicEvolving. Maybe we should 
add this annotation here as well. (And also below.)


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987928
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
--- End diff --

I'm also not sure about putting "Custom" after "Kryo" and "Avro". The 
reason for having a custom serializer is kind of that I want to bypass all this 
mess. Or to reason from an other angle, why would I have a custom serializer, 
if it doesn't work, and Kryo has to take over?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473590#comment-15473590
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77987928
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
+   new HashMap<>();
+
private final PojoField[] fields;

private final int totalFields;
 
+   /**
+* Register a custom serializer for a type. The precedence of the 
serializers
+* is the following (highest to lowest): Kryo, Avro, Custom, Generated, 
Flink.
+* The chosen serializer will be the first one from the list that is 
turned on.
--- End diff --

I'm also not sure about putting "Custom" after "Kryo" and "Avro". The 
reason for having a custom serializer is kind of that I want to bypass all this 
mess. Or to reason from an other angle, why would I have a custom serializer, 
if it doesn't work, and Kryo has to take over?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77988402
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -393,6 +443,18 @@ public void addComparatorField(int fieldId, 
TypeComparator comparator) {
keyFields.size() == fieldComparators.size(),
"Number of key fields and field comparators is 
not equal.");
 
+   Tuple2, Class> custCompKey = new 
Tuple2(keyFieldIds, getTypeClass());
+   if (customComparators.containsKey(custCompKey)) {
+   return 
InstantiationUtil.instantiate(customComparators.get(custCompKey));
+   }
+
+   if (config.isCodeGenerationEnabled()) {
+   return new 
PojoComparatorGenerator(keyFields.toArray(new Field[keyFields.size()]),
+   fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]), createSerializer
+   (config), getTypeClass(), 
keyFieldIds.toArray(new Integer[keyFields.size()]), config)
--- End diff --

I think it's less verbose to use the parameterless toArray, and cast to the 
appropriate array type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473596#comment-15473596
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77988402
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -393,6 +443,18 @@ public void addComparatorField(int fieldId, 
TypeComparator comparator) {
keyFields.size() == fieldComparators.size(),
"Number of key fields and field comparators is 
not equal.");
 
+   Tuple2, Class> custCompKey = new 
Tuple2(keyFieldIds, getTypeClass());
+   if (customComparators.containsKey(custCompKey)) {
+   return 
InstantiationUtil.instantiate(customComparators.get(custCompKey));
+   }
+
+   if (config.isCodeGenerationEnabled()) {
+   return new 
PojoComparatorGenerator(keyFields.toArray(new Field[keyFields.size()]),
+   fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]), createSerializer
+   (config), getTypeClass(), 
keyFieldIds.toArray(new Integer[keyFields.size()]), config)
--- End diff --

I think it's less verbose to use the parameterless toArray, and cast to the 
appropriate array type.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77989471
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
--- End diff --

You could use wildcard (`?`) here, instead of `Object`, to avoid the 
unchecked cast in the ctor. (And then also in 
`GenTypeComparatorProxy.comparators`, so that you can pass this there without a 
cast.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473612#comment-15473612
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77989471
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
--- End diff --

You could use wildcard (`?`) here, instead of `Object`, to avoid the 
unchecked cast in the ctor. (And then also in 
`GenTypeComparatorProxy.comparators`, so that you can pass this there without a 
cast.)


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990034
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+
+   transient private CompositeTypeComparator impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class comparatorClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (CompositeTypeComparator) 
ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
--- End diff --

This is a varargs method, so you don't need to create an array here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473629#comment-15473629
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990034
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+
+   transient private CompositeTypeComparator impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class comparatorClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (CompositeTypeComparator) 
ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
--- End diff --

This is a varargs method, so you don't need to create an array here.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990200
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+
+public class GenTypeSerializerProxy extends TypeSerializer {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+
+   transient private TypeSerializer impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
--- End diff --

varargs, no need to create an array


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990262
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+   private final Class type;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] 
comparators, TypeSerializer serializer,
+   
Class type, Integer[] keyFieldIds, ExecutionConfig config) {
+   this.keyFields = keyFields;
+   this.comparators = (TypeComparator[]) comparators;
+
+   this.type = type;
+   this.serializer = serializer;
+   this.keyFieldIds = keyFieldIds;
+   this.config = config;
+   }
+
+   public TypeComparator createComparator() {
+   // Multiple comparators can be generated for each type based on 
a list of keys. The list of keys and the type
+   // name should determine the generated comparator. This 
information is used for caching (avoiding
+   // recompilation). Note that, the name of the field is not 
sufficient because nested POJOs might have a field
+   // with the name.
+   StringBuilder keyBuilder = new StringBuilder();
+   for(Integer i : keyFieldIds) {
+   keyBuilder.append(i);
+   keyBuilder.append("_");
+   }
+   final String className = type.getCanonicalName().replace('.', 
'_') + "_GeneratedComparator" +
+   keyBuilder.toString();
+   final String fullClassName = packageName + "." + className;
+   Class comparatorClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if (config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeComparatorProxy<>(type, 
fullClassName, code, comparators, serializer);
+   }
+   try {
+   comparatorClazz = 
InstantiationUtil.compile(type.getClassLoader(), fullClassName, code);
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
comparator: " + className, e);
+   }
+   Constructor[] ctors = comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeComparator) ctors[0].newInstance(new 
Object[]{comparators, serializer, type});
--- End diff --

varargs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473632#comment-15473632
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990200
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+
+public class GenTypeSerializerProxy extends TypeSerializer {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+
+   transient private TypeSerializer impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
--- End diff --

varargs, no need to create an array


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473638#comment-15473638
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990329
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private final Class clazz;
+   private final Field[] refFields;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoSerializerGenerator(
+   Class clazz,
+   TypeSerializer[] fields,
+   Field[] reflectiveFields,
+   ExecutionConfig config) {
+   this.clazz = checkNotNull(clazz);
+   this.refFields = checkNotNull(reflectiveFields);
+   this.fieldSerializers = checkNotNull(fields);
+   this.config = checkNotNull(config);
+   for (int i = 0; i < this.refFields.length; i++) {
+   this.refFields[i].setAccessible(true);
+   }
+   }
+
+   public TypeSerializer createSerializer()  {
+   final String className = clazz.getCanonicalName().replace('.', 
'_') + "_GeneratedSerializer";
+   final String fullClassName = packageName + "." + className;
+   Class serializerClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if(config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeSerializerProxy<>(clazz, 
fullClassName, code, fieldSerializers, config);
+   }
+   try {
+   serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + className, e);
+   }
+   Constructor[] ctors = serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
--- End diff --

varargs


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990329
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private final Class clazz;
+   private final Field[] refFields;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoSerializerGenerator(
+   Class clazz,
+   TypeSerializer[] fields,
+   Field[] reflectiveFields,
+   ExecutionConfig config) {
+   this.clazz = checkNotNull(clazz);
+   this.refFields = checkNotNull(reflectiveFields);
+   this.fieldSerializers = checkNotNull(fields);
+   this.config = checkNotNull(config);
+   for (int i = 0; i < this.refFields.length; i++) {
+   this.refFields[i].setAccessible(true);
+   }
+   }
+
+   public TypeSerializer createSerializer()  {
+   final String className = clazz.getCanonicalName().replace('.', 
'_') + "_GeneratedSerializer";
+   final String fullClassName = packageName + "." + className;
+   Class serializerClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if(config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeSerializerProxy<>(clazz, 
fullClassName, code, fieldSerializers, config);
+   }
+   try {
+   serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + className, e);
+   }
+   Constructor[] ctors = serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
--- End diff --

varargs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473636#comment-15473636
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990262
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+   private final Class type;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] 
comparators, TypeSerializer serializer,
+   
Class type, Integer[] keyFieldIds, ExecutionConfig config) {
+   this.keyFields = keyFields;
+   this.comparators = (TypeComparator[]) comparators;
+
+   this.type = type;
+   this.serializer = serializer;
+   this.keyFieldIds = keyFieldIds;
+   this.config = config;
+   }
+
+   public TypeComparator createComparator() {
+   // Multiple comparators can be generated for each type based on 
a list of keys. The list of keys and the type
+   // name should determine the generated comparator. This 
information is used for caching (avoiding
+   // recompilation). Note that, the name of the field is not 
sufficient because nested POJOs might have a field
+   // with the name.
+   StringBuilder keyBuilder = new StringBuilder();
+   for(Integer i : keyFieldIds) {
+   keyBuilder.append(i);
+   keyBuilder.append("_");
+   }
+   final String className = type.getCanonicalName().replace('.', 
'_') + "_GeneratedComparator" +
+   keyBuilder.toString();
+   final String fullClassName = packageName + "." + className;
+   Class comparatorClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if (config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeComparatorProxy<>(type, 
fullClassName, code, comparators, serializer);
+   }
+   try {
+   comparatorClazz = 
InstantiationUtil.compile(type.getClassLoader(), fullClassName, code);
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
comparator: " + className, e);
+   }
+   Constructor[] ctors = comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeComparator) ctors[0].newInstance(new 
Object[]{comparators, serializer, type});
--- End diff --

varargs


> GSoC: Code Generation in Serializers
> -

[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473645#comment-15473645
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990530
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -377,9 +425,11 @@ public void initializeTypeComparatorBuilder(int size) {
public void addComparatorField(int fieldId, TypeComparator 
comparator) {
fieldComparators.add(comparator);
keyFields.add(fields[fieldId].getField());
+   keyFieldIds.add(fieldId);
}
 
@Override
+   @SuppressWarnings("unchecked")
--- End diff --

Use `Tuple2.of` to avoid the unchecked cast.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77990530
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -377,9 +425,11 @@ public void initializeTypeComparatorBuilder(int size) {
public void addComparatorField(int fieldId, TypeComparator 
comparator) {
fieldComparators.add(comparator);
keyFields.add(fields[fieldId].getField());
+   keyFieldIds.add(fieldId);
}
 
@Override
+   @SuppressWarnings("unchecked")
--- End diff --

Use `Tuple2.of` to avoid the unchecked cast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473658#comment-15473658
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77991234
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
--- End diff --

I would add `` to `TypeComparator` to avoid some unchecked casts later 
on.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77991234
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -70,10 +77,41 @@
private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
+   private static final Map, Class> 
customSerializers = new HashMap<>();
+   private static final Map, Class>, Class> customComparators =
--- End diff --

I would add `` to `TypeComparator` to avoid some unchecked casts later 
on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473677#comment-15473677
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77992416
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
--- End diff --

`Class` to avoid the unchecked call to `getMethod` below.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77992445
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
--- End diff --

``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473680#comment-15473680
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77992481
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
--- End diff --

varargs, you can just omit the second arg


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473678#comment-15473678
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77992445
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
--- End diff --

``


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77992481
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
--- End diff --

varargs, you can just omit the second arg


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77992416
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
--- End diff --

`Class` to avoid the unchecked call to `getMethod` below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2386: [FLINK-3660] Measure latency and exposes them via a metri...

2016-09-08 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
@aljoscha I updated the pull request, which now builds green (just rebasing 
fixed the issues).
I also keep metrics now per operator, not only for the sinks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473681#comment-15473681
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
@aljoscha I updated the pull request, which now builds green (just rebasing 
fixed the issues).
I also keep metrics now per operator, not only for the sinks.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-08 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473685#comment-15473685
 ] 

Simone Robutti commented on FLINK-4565:
---

I defined this parser combinator rule ```lazy val in:PackratParser[Expression] 
= term ~ "IN" ~ table ^^{
case l ~ _ ~ r => In(l,r)
  }```

`In` takes an expression (containing the field of the left table to be matched 
against the right table) and a Table. I need to define  a parser for a Table 
but I never worked with parser-combinators so I have no idea how to do that. 
Any suggestion?

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993022
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Is this possible at this point? The rules of being a POJO should not allow 
this, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993053
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same question as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473692#comment-15473692
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993053
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same question as above.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473689#comment-15473689
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993022
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Is this possible at this point? The rules of being a POJO should not allow 
this, right?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473702#comment-15473702
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993831
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
--- End diff --

Why do we need to cast to the boxed types if the field is non-boxed?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993831
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
--- End diff --

Why do we need to cast to the boxed types if the field is non-boxed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473715#comment-15473715
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r7799
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same as above.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r7799
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995157
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+
+   transient private CompositeTypeComparator impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class comparatorClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (CompositeTypeComparator) 
ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + name, e);
+   }
+   }
+
+   public GenTypeComparatorProxy(Class clazz, String name, String 
code,TypeComparator[] comparators,
+   
TypeSerializer serializer) {
+   this.name = name;
+   this.code = code;
+   this.clazz = clazz;
+   this.comparators = comparators;
+   this.serializer = serializer;
+   compile();
+   }
+
+   @SuppressWarnings("unchecked")
--- End diff --

Not needed if `comparators` has ``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473725#comment-15473725
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995157
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+
+   transient private CompositeTypeComparator impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class comparatorClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (CompositeTypeComparator) 
ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + name, e);
+   }
+   }
+
+   public GenTypeComparatorProxy(Class clazz, String name, String 
code,TypeComparator[] comparators,
+   
TypeSerializer serializer) {
+   this.name = name;
+   this.code = code;
+   this.clazz = clazz;
+   this.comparators = comparators;
+   this.serializer = serializer;
+   compile();
+   }
+
+   @SuppressWarnings("unchecked")
--- End diff --

Not needed if `comparators` has ``


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473732#comment-15473732
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995491
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
--- End diff --

Please add javadoc.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995491
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
--- End diff --

Please add javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995790
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+
+public class GenTypeSerializerProxy extends TypeSerializer {
--- End diff --

Please add javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473738#comment-15473738
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995790
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+
+public class GenTypeSerializerProxy extends TypeSerializer {
--- End diff --

Please add javadoc.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >