[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528764#comment-15528764
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2542
  
Hello @gaborhermann thank you for your contribution! 
Are the numbers here non-zero entries in a matrix?
If that is the case do you think it would be possible to test this on some 
larger scale datasets?

This would bring it closer to actual use cases someone using Flink might 
have.


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2542: [FLINK-4613] Extend ALS to handle implicit feedback datas...

2016-09-28 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2542
  
Hello @gaborhermann thank you for your contribution! 
Are the numbers here non-zero entries in a matrix?
If that is the case do you think it would be possible to test this on some 
larger scale datasets?

This would bring it closer to actual use cases someone using Flink might 
have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4632) when yarn nodemanager lost, flink hung

2016-09-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528781#comment-15528781
 ] 

刘喆 commented on FLINK-4632:
---

I can't reproduce it now.
I only save the TaskManager' log as the beginning.
If it happen again, I will come back here.
Thanks very much. 

> when yarn nodemanager lost,  flink hung
> ---
>
> Key: FLINK-4632
> URL: https://issues.apache.org/jira/browse/FLINK-4632
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Streaming
>Affects Versions: 1.2.0, 1.1.2
> Environment: cdh5.5.1  jdk1.7 flink1.1.2  1.2-snapshot   kafka0.8.2
>Reporter: 刘喆
>Priority: Blocker
>
> When run flink streaming on yarn,  using kafka as source,  it runs well when 
> start. But after long run, for example  8 hours, dealing 60,000,000+ 
> messages, it hung: no messages consumed,   one taskmanager is CANCELING, the 
> exception show:
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> connection timeout
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:152)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: 连接超时
>   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
>   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>   ... 6 more
> after applyhttps://issues.apache.org/jira/browse/FLINK-4625   
> it show:
> java.lang.Exception: TaskManager was lost/killed: 
> ResourceID{resourceId='container_1471620986643_744852_01_001400'} @ 
> 38.slave.adh (dataPort=45349)
>   at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:162)
>   at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>   at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:138)
>   at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>   at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(Instanc

[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80861067
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -581,6 +637,16 @@ object ALS {
 val userXy = new ArrayBuffer[Array[Double]]()
 val numRatings = new ArrayBuffer[Int]()
 
+var precomputedXtX: Array[Double] = null
+
+override def open(config: Configuration): Unit = {
+  // retrieve broadcasted precomputed XtX if using implicit 
feedback
+  if (implicitPrefs) {
+precomputedXtX = 
getRuntimeContext.getBroadcastVariable[Array[Double]]("XtX")
+  .iterator().next()
+  }
+}
+
 override def coGroup(left: lang.Iterable[(Int, Int, 
Array[Array[Double]])],
--- End diff --

Is this a Java iterable here? Any reason to use this instead of the Scala 
`Iterable` trait?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80862018
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
--- End diff --

Are you testing two functionalities in this test? If yes I suggest to split 
them to two functional units.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860057
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -49,6 +49,21 @@ By applying this step alternately to the matrices $U$ 
and $V$, we can iterativel
 
 The matrix $R$ is given in its sparse representation as a tuple of $(i, j, 
r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix 
value at position $(i,j)$.
 
+An alternative model can be used for _implicit feedback_ datasets.
+These datasets only contain implicit feedback from the user
+in contrast to datasets with explicit feedback like movie ratings.
+For example users watch videos on a website and the website monitors which 
user
+viewed which video, so the users only provide their preference implicitly.
+In these cases the feedback should not be treated as a
+rating, but rather an evidence that the user prefers that item.
+Thus, for implicit feedback datasets there is a slightly different
+minimalization problem to solve (see [Hu et 
al.](http://dx.doi.org/10.1109/ICDM.2008.22) for details).
--- End diff --

Change "minimalization" to "optimization".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860320
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
--- End diff --

Missing word at the end "consumed the ???". Would also change "explicit 
preference" to "explicit rating from the user".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860723
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
+(Default value: false)
+  
+
+  
+  
+Alpha
+
+  
+Weight of the positive implicit observations. Should be 
non-negative.
+Only relevant when ImplicitPrefs is set to true.
+(Default value: 1)
--- End diff --

Can you provide some motivation for this default value? From the paper I 
see:

> In our experiments, setting α = 40 was found to produce good results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80862241
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
+import ExampleMatrix._
+
+val rand = scala.util.Random
+val numBlocks = 3
+// randomly split matrix to blocks
+val blocksY = Y
+  // add a random block id to every row
+  .map { row =>
+(rand.nextInt(numBlocks), row)
+  }
+  // get the block via grouping
+  .groupBy(_._1).values
+  // add a block id (-1) to each block
+  .map(b => (-1, b.map(_._2)))
+  .toSeq
+
+// use Flink to compute YtY
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val distribBlocksY = env.fromCollection(blocksY)
+
+val YtY = ALS
+  .computeXtX(distribBlocksY, factors)
+  .collect().head
+
+// check YtY size
+YtY.length should be (factors * (factors - 1) / 2 + factors)
+
+// check result is as expected
+expectedUpperTriangleYtY
+  .zip(YtY)
+  .foreach { case (expected, result) =>
+result should be (expected +- 0.1)
+  }
+
+// temporary directory to avoid too few memory segments
+val tempDir = CommonTestUtils.getTempDir + "/"
+
+// factorize matrix with implicit ALS
+val als = ALS()
+  .setIterations(iterations)
+  .setLambda(lambda)
+  .setBlocks(blocks)
+  .setNumFactors(factors)
+  .setImplicit(true)
+  .setAlpha(alpha)
+  .setSeed(seed)
+  .setTemporaryPath(tempDir)
+
+val inputDS = env.fromCollection(implicitRatings)
+
+als.fit(inputDS)
+
+// check predictions on some user-item pairs
+val testData = env.fromCollection(expectedResult.map{
+  case (userID, itemID, rating) => (userID, itemID)
+})
+
+val predictions = als.predict(testData).collect()
+
+predictions.length should equal(expectedResult.length)
+
+val resultMap = expectedResult map {
+  case (uID, iID, value) => (uID, iID) -> value
+} toMap
+
+predictions foreach {
+  case (uID, iID, value) => {
+resultMap.isDefinedAt((uID, iID)) should be(true)
+
+value should be(resultMap((uID, iID)) +- 1e-5)
+  }
+}
+
+  }
+
+}
+
+object ExampleMatrix {
--- End diff --

Data should go to the `Recommendation.scala` file, as with the plain ALS 
matrix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528811#comment-15528811
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80862018
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
--- End diff --

Are you testing two functionalities in this test? If yes I suggest to split 
them to two functional units.


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528815#comment-15528815
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80861067
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -581,6 +637,16 @@ object ALS {
 val userXy = new ArrayBuffer[Array[Double]]()
 val numRatings = new ArrayBuffer[Int]()
 
+var precomputedXtX: Array[Double] = null
+
+override def open(config: Configuration): Unit = {
+  // retrieve broadcasted precomputed XtX if using implicit 
feedback
+  if (implicitPrefs) {
+precomputedXtX = 
getRuntimeContext.getBroadcastVariable[Array[Double]]("XtX")
+  .iterator().next()
+  }
+}
+
 override def coGroup(left: lang.Iterable[(Int, Int, 
Array[Array[Double]])],
--- End diff --

Is this a Java iterable here? Any reason to use this instead of the Scala 
`Iterable` trait?


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528814#comment-15528814
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860320
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
--- End diff --

Missing word at the end "consumed the ???". Would also change "explicit 
preference" to "explicit rating from the user".


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528816#comment-15528816
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860057
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -49,6 +49,21 @@ By applying this step alternately to the matrices $U$ 
and $V$, we can iterativel
 
 The matrix $R$ is given in its sparse representation as a tuple of $(i, j, 
r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix 
value at position $(i,j)$.
 
+An alternative model can be used for _implicit feedback_ datasets.
+These datasets only contain implicit feedback from the user
+in contrast to datasets with explicit feedback like movie ratings.
+For example users watch videos on a website and the website monitors which 
user
+viewed which video, so the users only provide their preference implicitly.
+In these cases the feedback should not be treated as a
+rating, but rather an evidence that the user prefers that item.
+Thus, for implicit feedback datasets there is a slightly different
+minimalization problem to solve (see [Hu et 
al.](http://dx.doi.org/10.1109/ICDM.2008.22) for details).
--- End diff --

Change "minimalization" to "optimization".


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528813#comment-15528813
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860723
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
+(Default value: false)
+  
+
+  
+  
+Alpha
+
+  
+Weight of the positive implicit observations. Should be 
non-negative.
+Only relevant when ImplicitPrefs is set to true.
+(Default value: 1)
--- End diff --

Can you provide some motivation for this default value? From the paper I 
see:

> In our experiments, setting α = 40 was found to produce good results.


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528812#comment-15528812
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80862241
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
+import ExampleMatrix._
+
+val rand = scala.util.Random
+val numBlocks = 3
+// randomly split matrix to blocks
+val blocksY = Y
+  // add a random block id to every row
+  .map { row =>
+(rand.nextInt(numBlocks), row)
+  }
+  // get the block via grouping
+  .groupBy(_._1).values
+  // add a block id (-1) to each block
+  .map(b => (-1, b.map(_._2)))
+  .toSeq
+
+// use Flink to compute YtY
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val distribBlocksY = env.fromCollection(blocksY)
+
+val YtY = ALS
+  .computeXtX(distribBlocksY, factors)
+  .collect().head
+
+// check YtY size
+YtY.length should be (factors * (factors - 1) / 2 + factors)
+
+// check result is as expected
+expectedUpperTriangleYtY
+  .zip(YtY)
+  .foreach { case (expected, result) =>
+result should be (expected +- 0.1)
+  }
+
+// temporary directory to avoid too few memory segments
+val tempDir = CommonTestUtils.getTempDir + "/"
+
+// factorize matrix with implicit ALS
+val als = ALS()
+  .setIterations(iterations)
+  .setLambda(lambda)
+  .setBlocks(blocks)
+  .setNumFactors(factors)
+  .setImplicit(true)
+  .setAlpha(alpha)
+  .setSeed(seed)
+  .setTemporaryPath(tempDir)
+
+val inputDS = env.fromCollection(implicitRatings)
+
+als.fit(inputDS)
+
+// check predictions on some user-item pairs
+val testData = env.fromCollection(expectedResult.map{
+  case (userID, itemID, rating) => (userID, itemID)
+})
+
+val predictions = als.predict(testData).collect()
+
+predictions.length should equal(expectedResult.length)
+
+val resultMap = expectedResult map {
+  case (uID, iID, value) => (uID, iID) -> value
+} toMap
+
+predictions foreach {
+  case (uID, iID, value) => {
+resultMap.isDefinedAt((uID, iID)) should be(true)
+
+value should be(resultMap((uID, iID)) +- 1e-5)
+  }
+}
+
+  }
+
+}
+
+object ExampleMatrix {
--- End diff --

Data should go to the `Recommendation.scala` file, as with the plain ALS 
matrix.


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares imple

[jira] [Commented] (FLINK-4632) when yarn nodemanager lost, flink hung

2016-09-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528833#comment-15528833
 ] 

刘喆 commented on FLINK-4632:
---

I think it is related to checkpoint.
When I use checkpoint with 'exactly_once'  mode,  sub task may hung but other 
sub tasks running, after a long time, all tasks hung.  At the same time,  there 
is no more checkpoint producted.
When killing, maybe the checkpoint block other thread.  I use JobManager as 
checkpoint backend.  The checkpoint interval is 30 seconds.

Some log as below:
2016-09-27 09:42:59,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (369/500) 
(e60c3755b5461c181e29cd30400cd6b0) switched from DEPLOYING to RUNNING
2016-09-27 09:42:59,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (274/500) 
(f983db19c603a51027cf7031e19edb79) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,256 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (497/500) 
(92bde125c4eb920d32aa11b6514f4cf1) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,477 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (373/500) 
(a59fec5a8ce66518d9003e6a480e1854) switched from DEPLOYING to RUNNING
2016-09-27 09:44:05,867 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1474940645865
2016-09-27 09:44:41,782 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 35917 ms)
2016-09-27 09:49:05,865 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1474940945865
2016-09-27 09:50:05,866 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 2 
expired before completing.
2016-09-27 09:50:07,390 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:10,572 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:12,207 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2



> when yarn nodemanager lost,  flink hung
> ---
>
> Key: FLINK-4632
> URL: https://issues.apache.org/jira/browse/FLINK-4632
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Streaming
>Affects Versions: 1.2.0, 1.1.2
> Environment: cdh5.5.1  jdk1.7 flink1.1.2  1.2-snapshot   kafka0.8.2
>Reporter: 刘喆
>Priority: Blocker
>
> When run flink streaming on yarn,  using kafka as source,  it runs well when 
> start. But after long run, for example  8 hours, dealing 60,000,000+ 
> messages, it hung: no messages consumed,   one taskmanager is CANCELING, the 
> exception show:
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> connection timeout
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:152)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162)
>   at 
> io.netty.chan

[jira] [Comment Edited] (FLINK-4632) when yarn nodemanager lost, flink hung

2016-09-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528833#comment-15528833
 ] 

刘喆 edited comment on FLINK-4632 at 9/28/16 8:13 AM:


I think it is related to checkpoint.
When I use checkpoint with 'exactly_once'  mode,  sub task may hung but other 
sub tasks running, after a long time, all tasks hung.  At the same time,  there 
is no more checkpoint producted.
When killing, maybe the checkpoint block other thread.  I use JobManager as 
checkpoint backend.  The checkpoint interval is 30 seconds.

Some log as below:
2016-09-27 09:42:59,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (369/500) 
(e60c3755b5461c181e29cd30400cd6b0) switched from DEPLOYING to RUNNING
2016-09-27 09:42:59,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (274/500) 
(f983db19c603a51027cf7031e19edb79) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,256 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (497/500) 
(92bde125c4eb920d32aa11b6514f4cf1) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,477 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (373/500) 
(a59fec5a8ce66518d9003e6a480e1854) switched from DEPLOYING to RUNNING
2016-09-27 09:44:05,867 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1474940645865
2016-09-27 09:44:41,782 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 35917 ms)
2016-09-27 09:49:05,865 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1474940945865
2016-09-27 09:50:05,866 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 2 
expired before completing.
2016-09-27 09:50:07,390 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:10,572 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:12,207 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2


and when I don't use checkpoint at all, it work well


was (Author: liuzhe):
I think it is related to checkpoint.
When I use checkpoint with 'exactly_once'  mode,  sub task may hung but other 
sub tasks running, after a long time, all tasks hung.  At the same time,  there 
is no more checkpoint producted.
When killing, maybe the checkpoint block other thread.  I use JobManager as 
checkpoint backend.  The checkpoint interval is 30 seconds.

Some log as below:
2016-09-27 09:42:59,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (369/500) 
(e60c3755b5461c181e29cd30400cd6b0) switched from DEPLOYING to RUNNING
2016-09-27 09:42:59,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (274/500) 
(f983db19c603a51027cf7031e19edb79) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,256 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (497/500) 
(92bde125c4eb920d32aa11b6514f4cf1) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,477 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (373/500) 
(a59fec5a8ce66518d9003e6a480e1854) switched from DEPLOYING to RUNNING
2016-09-27 09:44:05,867 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1474940645865
2016-09-27 09:44:41,782 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 35917 ms)
2016-09-27 09:49:05,865 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1474940945865
2016-09-27 09:50:05,866 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 2 
expired before completing.
2016-09-27 09:50:07,390 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:10,572 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:12,207 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2



> when yarn nodemanager lost,  flink hung
> ---
>
> Key: FLINK-4632
> URL: https://issues.apache.org/jira/browse/FLINK-4632
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Streaming
>Affects Versions: 1.2.0, 1.1.2
> Environment: cdh5.5.1  jdk1.7 flink1.1.2  1.2-sna

[jira] [Comment Edited] (FLINK-4632) when yarn nodemanager lost, flink hung

2016-09-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528833#comment-15528833
 ] 

刘喆 edited comment on FLINK-4632 at 9/28/16 8:25 AM:


I think it is related to checkpoint.
When I use checkpoint with 'exactly_once'  mode,  sub task may hung but other 
sub tasks running, after a long time, all tasks hung.  At the same time,  there 
is no more checkpoint producted.
When killing, maybe the checkpoint block other thread.  I use JobManager as 
checkpoint backend.  The checkpoint interval is 30 seconds.

I have 2 screenshot for it:
http://p1.bpimg.com/567571/eb9442e01ede0a24.png
http://p1.bpimg.com/567571/eb9442e01ede0a24.png



Some log as below:
2016-09-27 09:42:59,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (369/500) 
(e60c3755b5461c181e29cd30400cd6b0) switched from DEPLOYING to RUNNING
2016-09-27 09:42:59,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (274/500) 
(f983db19c603a51027cf7031e19edb79) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,256 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (497/500) 
(92bde125c4eb920d32aa11b6514f4cf1) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,477 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (373/500) 
(a59fec5a8ce66518d9003e6a480e1854) switched from DEPLOYING to RUNNING
2016-09-27 09:44:05,867 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1474940645865
2016-09-27 09:44:41,782 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 35917 ms)
2016-09-27 09:49:05,865 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1474940945865
2016-09-27 09:50:05,866 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 2 
expired before completing.
2016-09-27 09:50:07,390 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:10,572 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:12,207 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2


and when I don't use checkpoint at all, it work well


was (Author: liuzhe):
I think it is related to checkpoint.
When I use checkpoint with 'exactly_once'  mode,  sub task may hung but other 
sub tasks running, after a long time, all tasks hung.  At the same time,  there 
is no more checkpoint producted.
When killing, maybe the checkpoint block other thread.  I use JobManager as 
checkpoint backend.  The checkpoint interval is 30 seconds.

Some log as below:
2016-09-27 09:42:59,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (369/500) 
(e60c3755b5461c181e29cd30400cd6b0) switched from DEPLOYING to RUNNING
2016-09-27 09:42:59,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (274/500) 
(f983db19c603a51027cf7031e19edb79) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,256 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (497/500) 
(92bde125c4eb920d32aa11b6514f4cf1) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,477 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (373/500) 
(a59fec5a8ce66518d9003e6a480e1854) switched from DEPLOYING to RUNNING
2016-09-27 09:44:05,867 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1474940645865
2016-09-27 09:44:41,782 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 35917 ms)
2016-09-27 09:49:05,865 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1474940945865
2016-09-27 09:50:05,866 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 2 
expired before completing.
2016-09-27 09:50:07,390 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:10,572 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:12,207 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2


and when I don't use checkpoint at all, it work well

> when yarn nodemanager lost,  flink hung
> ---
>
> Key: FLINK-4632
> URL: https://issues.apache.org/jira/browse/FLINK-4632
> Project: Flink
>

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80869838
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

I think both approaches, sending an exception or sending a failure response 
message in case of an exceptionally state on the receiving side, are fine. 
However, I don't agree that exceptions are only reserved for the rpc layer. 
"User" code should also be allowed to throw exceptions. I agree, though, that 
we have to pay special attention to these cases on the caller-side, because 
there is no compile time check that we actually handle all occurring exceptions.
All rpc related exceptions are wrapped in a `RpcConnectionException` and 
should, thus, be distinguishable from the "user" code exceptions.
In this specific case, I would go with throwing exceptions, because they 
transmit more information about the failure reason compared to simply sending 
`null` to the caller.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528958#comment-15528958
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80869838
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

I think both approaches, sending an exception or sending a failure response 
message in case of an exceptionally state on the receiving side, are fine. 
However, I don't agree that exceptions are only reserved for the rpc layer. 
"User" code should also be allowed to throw exceptions. I agree, though, that 
we have to pay special attention to these cases on the caller-side, because 
there is no compile time check that we actually handle all occurring exceptions.
All rpc related exceptions are wrapped in a `RpcConnectionException` and 
should, thus, be distinguishable from the "user" code exceptions.
In this specific case, I would go with throwing exceptions, because they 
transmit more information about the failure reason compared to simply sending 
`null` to the caller.


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528968#comment-15528968
 ] 

ASF GitHub Bot commented on FLINK-4068:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2560
  
I will shepherd this PR.


> Move constant computations out of code-generated `flatMap` functions.
> -
>
> Key: FLINK-4068
> URL: https://issues.apache.org/jira/browse/FLINK-4068
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The generated functions for expressions of the Table API or SQL include 
> constant computations.
> For instance the code generated for a predicate like:
> {code}
> myInt < (10 + 20)
> {code}
> looks roughly like:
> {code}
> public void flatMap(Row in, Collector out) {
>   Integer in1 = in.productElement(1);
>   int temp = 10 + 20;  
>   if (in1 < temp) {
> out.collect(in)
>   }
> }
> {code}
> In this example the computation of {{temp}} is constant and could be moved 
> out of the {{flatMap()}} method.
> The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2560: [FLINK-4068] [table] Move constant computations out of co...

2016-09-28 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2560
  
I will shepherd this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-28 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
I've merged your PR @wangzhijiang999. Thanks for your work :-) You can 
close this PR now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4505.
--
Resolution: Fixed

Added via 5513fe61dc8699a617c163ee2a555e43dc6422e4

> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529008#comment-15529008
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
I've merged your PR @wangzhijiang999. Thanks for your work :-) You can 
close this PR now.


> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-28 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80874290
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

Thanks for your comments @tillrohrmann . It all sounds reasonable to me. 
Limit the rpc error within one specified Exception is a good idea, thus user 
can easily know what went wrong. 
One minor suggestion, may be we should create a base class `RpcException`, 
and make something like `RpcConnectionException`, `RpcExecutionException` and 
`RpcTimeoutException` to inherit from that, to make a better more clear. This 
can be done in another jira, though. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529018#comment-15529018
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80874290
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

Thanks for your comments @tillrohrmann . It all sounds reasonable to me. 
Limit the rpc error within one specified Exception is a good idea, thus user 
can easily know what went wrong. 
One minor suggestion, may be we should create a base class `RpcException`, 
and make something like `RpcConnectionException`, `RpcExecutionException` and 
`RpcTimeoutException` to inherit from that, to make a better more clear. This 
can be done in another jira, though. 


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true

2016-09-28 Thread Kirill Morozov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529066#comment-15529066
 ] 

Kirill Morozov commented on FLINK-4506:
---

Rollbacked to old behavior, documentation fixed: 
https://github.com/apache/flink/pull/2477/commits/34addbfdb5da0cef596e23ff8e1311954ac0682b
 

> CsvOutputFormat defaults allowNullValues to false, even though doc and 
> declaration says true
> 
>
> Key: FLINK-4506
> URL: https://issues.apache.org/jira/browse/FLINK-4506
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Documentation
>Reporter: Michael Wong
>Assignee: Kirill Morozov
>Priority: Minor
>
> In the constructor, it has this
> {code}
> this.allowNullValues = false;
> {code}
> But in the setAllowNullValues() method, the doc says the allowNullValues is 
> true by default. Also, in the declaration of allowNullValues, the value is 
> set to true. It probably makes the most sense to change the constructor.
> {code}
>   /**
>* Configures the format to either allow null values (writing an empty 
> field),
>* or to throw an exception when encountering a null field.
>* 
>* by default, null values are allowed.
>*
>* @param allowNulls Flag to indicate whether the output format should 
> accept null values.
>*/
>   public void setAllowNullValues(boolean allowNulls) {
>   this.allowNullValues = allowNulls;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-28 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang closed FLINK-4505.


> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

2016-09-28 Thread wangzhijiang999
Github user wangzhijiang999 closed the pull request at:

https://github.com/apache/flink/pull/2461


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529111#comment-15529111
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user wangzhijiang999 closed the pull request at:

https://github.com/apache/flink/pull/2461


> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-28 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , thank you for merging and help. If there are any following 
works to do related with TaskManager, you can assign to me and I am willing to 
do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529109#comment-15529109
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , thank you for merging and help. If there are any following 
works to do related with TaskManager, you can assign to me and I am willing to 
do.


> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4563) [metrics] scope caching not adjusted for multiple reporters

2016-09-28 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529170#comment-15529170
 ] 

Chesnay Schepler commented on FLINK-4563:
-

yes, those tests should pass. additionally though i would say that different 
filters for a single reporter should be noticed as well:

{code:java}
public static class TestReporter1 extends TestReporter {
@Override
public String filterCharacters(String input) {
return input.replace("A", "a");
}

@Override
public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
assertEquals("a.B.C.D.1", 
group.getMetricIdentifier(metricName, this));
assertEquals("A.b.C.D.1", 
group.getMetricIdentifier(metricName, new CharacterFiler(...) {
return input.replace("B", 
"b");}));
}
}
{code}

Note that this test would never fail, as the exception in assertEquals should 
be catched and never propagated outwards. Second, you should be able to 
leverage the code in https://github.com/apache/flink/pull/2517 to differentiate 
between reporters.

> [metrics] scope caching not adjusted for multiple reporters
> ---
>
> Key: FLINK-4563
> URL: https://issues.apache.org/jira/browse/FLINK-4563
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Every metric group contains a scope string, representing what entities 
> (job/task/etc.) a given metric belongs to, which is calculated on demand. 
> Before this string is cached a CharacterFilter is applied to it, which is 
> provided by the callee, usually a reporter. This was done since different 
> reporters have different requirements in regards to valid characters. The 
> filtered string is cached so that we don't have to refilter the string every 
> time.
> This all works fine with a single reporter; with multiple however it is 
> completely broken as only the first filter is ever applied.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529192#comment-15529192
 ] 

ASF GitHub Bot commented on FLINK-4573:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2556#discussion_r80885273
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 ---
@@ -210,7 +210,17 @@ public void onSuccess(Object filePathOption) throws 
Throwable {
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
-   long fileLength = raf.length();
+   long fileLength;
+   try {
+   fileLength = 
raf.length();
+   } catch (IOException ioe) {
+   display(ctx, request, 
"Displaying TaskManager log failed.");
+   LOG.error("Displaying 
TaskManager log failed.", ioe);
+   if (raf != null) {
+   raf.close();
+   }
+   return;
--- End diff --

let's just re-throw the exception to retain current behavior.


> Potential resource leak due to unclosed RandomAccessFile in 
> TaskManagerLogHandler
> -
>
> Key: FLINK-4573
> URL: https://issues.apache.org/jira/browse/FLINK-4573
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> try {
> raf = new 
> RandomAccessFile(file, "r");
> } catch 
> (FileNotFoundException e) {
> display(ctx, request, 
> "Displaying TaskManager log failed.");
> LOG.error("Displaying 
> TaskManager log failed.", e);
> return;
> }
> long fileLength = 
> raf.length();
> final FileChannel fc = 
> raf.getChannel();
> {code}
> If length() throws IOException, raf would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2556: [FLINK-4573] Fix potential resource leak due to un...

2016-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2556#discussion_r80885273
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 ---
@@ -210,7 +210,17 @@ public void onSuccess(Object filePathOption) throws 
Throwable {
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
-   long fileLength = raf.length();
+   long fileLength;
+   try {
+   fileLength = 
raf.length();
+   } catch (IOException ioe) {
+   display(ctx, request, 
"Displaying TaskManager log failed.");
+   LOG.error("Displaying 
TaskManager log failed.", ioe);
+   if (raf != null) {
+   raf.close();
+   }
+   return;
--- End diff --

let's just re-throw the exception to retain current behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529194#comment-15529194
 ] 

ASF GitHub Bot commented on FLINK-4573:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2556#discussion_r80885477
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 ---
@@ -210,7 +210,17 @@ public void onSuccess(Object filePathOption) throws 
Throwable {
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
-   long fileLength = raf.length();
+   long fileLength;
+   try {
+   fileLength = 
raf.length();
+   } catch (IOException ioe) {
+   display(ctx, request, 
"Displaying TaskManager log failed.");
+   LOG.error("Displaying 
TaskManager log failed.", ioe);
+   if (raf != null) {
--- End diff --

this should never be false, as we would otherwise get an NPE and would 
never enter this block.


> Potential resource leak due to unclosed RandomAccessFile in 
> TaskManagerLogHandler
> -
>
> Key: FLINK-4573
> URL: https://issues.apache.org/jira/browse/FLINK-4573
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> try {
> raf = new 
> RandomAccessFile(file, "r");
> } catch 
> (FileNotFoundException e) {
> display(ctx, request, 
> "Displaying TaskManager log failed.");
> LOG.error("Displaying 
> TaskManager log failed.", e);
> return;
> }
> long fileLength = 
> raf.length();
> final FileChannel fc = 
> raf.getChannel();
> {code}
> If length() throws IOException, raf would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2556: [FLINK-4573] Fix potential resource leak due to un...

2016-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2556#discussion_r80885477
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 ---
@@ -210,7 +210,17 @@ public void onSuccess(Object filePathOption) throws 
Throwable {
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
-   long fileLength = raf.length();
+   long fileLength;
+   try {
+   fileLength = 
raf.length();
+   } catch (IOException ioe) {
+   display(ctx, request, 
"Displaying TaskManager log failed.");
+   LOG.error("Displaying 
TaskManager log failed.", ioe);
+   if (raf != null) {
--- End diff --

this should never be false, as we would otherwise get an NPE and would 
never enter this block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80886088
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -216,17 +244,20 @@ public ScopeFormats getScopeFormats() {
 * @param metricName  the name of the metric
 * @param group   the group that contains the metric
 */
-   public void register(Metric metric, String metricName, MetricGroup 
group) {
+   public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
try {
if (reporters != null) {
-   for (MetricReporter reporter : reporters) {
+   for (int i = 0; i < reporters.size(); i++) {
+   MetricReporter reporter = 
reporters.get(i);
if (reporter != null) {
-   
reporter.notifyOfAddedMetric(metric, metricName, group);
+   FrontMetricGroup front = new 
FrontMetricGroup(i);
--- End diff --

we will now create a new object every time we add a new metric. Instead, 
keep a single `FrontMetricGroup` instance in the registry, and add a 
`setIndex()` method to it that you call here along with `setReference()`.

but, i just realized that without this you would end up with concurrency 
issues since multiple register calls can be active at the same time...I'll have 
to think about this for a bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80886970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -216,17 +244,20 @@ public ScopeFormats getScopeFormats() {
 * @param metricName  the name of the metric
 * @param group   the group that contains the metric
 */
-   public void register(Metric metric, String metricName, MetricGroup 
group) {
+   public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
try {
if (reporters != null) {
-   for (MetricReporter reporter : reporters) {
+   for (int i = 0; i < reporters.size(); i++) {
+   MetricReporter reporter = 
reporters.get(i);
if (reporter != null) {
-   
reporter.notifyOfAddedMetric(metric, metricName, group);
+   FrontMetricGroup front = new 
FrontMetricGroup(i);
+   front.setReference(group);
--- End diff --

The references must also be set within `unregister()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529216#comment-15529216
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80886088
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -216,17 +244,20 @@ public ScopeFormats getScopeFormats() {
 * @param metricName  the name of the metric
 * @param group   the group that contains the metric
 */
-   public void register(Metric metric, String metricName, MetricGroup 
group) {
+   public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
try {
if (reporters != null) {
-   for (MetricReporter reporter : reporters) {
+   for (int i = 0; i < reporters.size(); i++) {
+   MetricReporter reporter = 
reporters.get(i);
if (reporter != null) {
-   
reporter.notifyOfAddedMetric(metric, metricName, group);
+   FrontMetricGroup front = new 
FrontMetricGroup(i);
--- End diff --

we will now create a new object every time we add a new metric. Instead, 
keep a single `FrontMetricGroup` instance in the registry, and add a 
`setIndex()` method to it that you call here along with `setReference()`.

but, i just realized that without this you would end up with concurrency 
issues since multiple register calls can be active at the same time...I'll have 
to think about this for a bit.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529215#comment-15529215
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80886970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -216,17 +244,20 @@ public ScopeFormats getScopeFormats() {
 * @param metricName  the name of the metric
 * @param group   the group that contains the metric
 */
-   public void register(Metric metric, String metricName, MetricGroup 
group) {
+   public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
try {
if (reporters != null) {
-   for (MetricReporter reporter : reporters) {
+   for (int i = 0; i < reporters.size(); i++) {
+   MetricReporter reporter = 
reporters.get(i);
if (reporter != null) {
-   
reporter.notifyOfAddedMetric(metric, metricName, group);
+   FrontMetricGroup front = new 
FrontMetricGroup(i);
+   front.setReference(group);
--- End diff --

The references must also be set within `unregister()`.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2477: [FLINK-4506] CsvOutputFormat defaults allowNullVal...

2016-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2477#discussion_r80887299
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class CsvOutputFormatTest {
+
+   private static final Path PATH = new Path("csv_output_test_file.csv");
+
+   @Test
+   public void testNullAllow() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.setAllowNullValues(true);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
+   final FileSystem fs = PATH.getFileSystem();
+   Assert.assertTrue(fs.exists(PATH));
+   FSDataInputStream inputStream = fs.open(PATH);
+   String csvContent = IOUtils.toString(inputStream, 
StandardCharsets.UTF_8);
+   Assert.assertEquals("One,,8\n", csvContent);
+   fs.delete(PATH, true);
+   }
+
+   @Test(expected = RuntimeException.class)
+   public void testNullDisallowOnDefault() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
--- End diff --

The temporary file is created within `open()` and will not be cleaned up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529220#comment-15529220
 ] 

ASF GitHub Bot commented on FLINK-4506:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2477#discussion_r80887299
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class CsvOutputFormatTest {
+
+   private static final Path PATH = new Path("csv_output_test_file.csv");
+
+   @Test
+   public void testNullAllow() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.setAllowNullValues(true);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
+   final FileSystem fs = PATH.getFileSystem();
+   Assert.assertTrue(fs.exists(PATH));
+   FSDataInputStream inputStream = fs.open(PATH);
+   String csvContent = IOUtils.toString(inputStream, 
StandardCharsets.UTF_8);
+   Assert.assertEquals("One,,8\n", csvContent);
+   fs.delete(PATH, true);
+   }
+
+   @Test(expected = RuntimeException.class)
+   public void testNullDisallowOnDefault() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
--- End diff --

The temporary file is created within `open()` and will not be cleaned up.


> CsvOutputFormat defaults allowNullValues to false, even though doc and 
> declaration says true
> 
>
> Key: FLINK-4506
> URL: https://issues.apache.org/jira/browse/FLINK-4506
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Documentation
>Reporter: Michael Wong
>Assignee: Kirill Morozov
>Priority: Minor
>
> In the constructor, it has this
> {code}
> this.allowNullValues = false;
> {code}
> But in the setAllowNullValues() method, the doc says the allowNullValues is 
> true by default. Also, in the declaration of allowNullValues, the value is 
> set to true. It probably makes the most sense to change the constructor.
> {code}
>   /**
>* Configures the format to either allow null values (writing an empty 
> field),
>* or to throw an exception when encountering a null field.
>* 
>* by default, null values are allowed.
>*
>* @param allowNulls Flag to indicate whether the output format should 
> accept null values.
>*/
>   public void setAllowNullValues(boolean allowNulls) {
>   this.allowNullValues = allowNulls;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4703) RpcCompletenessTest: Add support for type arguments and subclasss

2016-09-28 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-4703:
-

 Summary: RpcCompletenessTest: Add support for type arguments and 
subclasss
 Key: FLINK-4703
 URL: https://issues.apache.org/jira/browse/FLINK-4703
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.2.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Priority: Minor
 Fix For: 1.2.0


The RpcCompletenessTest doesn't work for class hierarchies because it assumes 
that the Gateway argument always is provided in the super class.

Further, the RpcCompletenessTest should skip abstract classes and classes which 
have type parameters bound to {{RpcGateway.class}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4703) RpcCompletenessTest: Add support for type arguments and subclasses

2016-09-28 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-4703:
--
Summary: RpcCompletenessTest: Add support for type arguments and subclasses 
 (was: RpcCompletenessTest: Add support for type arguments and subclasss)

> RpcCompletenessTest: Add support for type arguments and subclasses
> --
>
> Key: FLINK-4703
> URL: https://issues.apache.org/jira/browse/FLINK-4703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.2.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The RpcCompletenessTest doesn't work for class hierarchies because it assumes 
> that the Gateway argument always is provided in the super class.
> Further, the RpcCompletenessTest should skip abstract classes and classes 
> which have type parameters bound to {{RpcGateway.class}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2527: [FLINK-4624] Allow for null values in Graph Summarization

2016-09-28 Thread s1ck
Github user s1ck commented on the issue:

https://github.com/apache/flink/pull/2527
  
@greghogan since I addressed all your comments, is there anything left to 
do that prevents this from merging?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4624) Gelly's summarization algorithm cannot deal with null vertex group values

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529234#comment-15529234
 ] 

ASF GitHub Bot commented on FLINK-4624:
---

Github user s1ck commented on the issue:

https://github.com/apache/flink/pull/2527
  
@greghogan since I addressed all your comments, is there anything left to 
do that prevents this from merging?


> Gelly's summarization algorithm cannot deal with null vertex group values
> -
>
> Key: FLINK-4624
> URL: https://issues.apache.org/jira/browse/FLINK-4624
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Till Rohrmann
>Assignee: Martin Junghanns
> Fix For: 1.2.0
>
>
> Gelly's {{Summarization}} algorithm cannot handle null values in the 
> `VertexGroupItem.f2`. This behaviour is hidden by using Strings as a vertex 
> value in the {{SummarizationITCase}}, because the {{StringSerializer}} can 
> handle null values. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2561: [FLINK-4703] RpcCompletenessTest: Add support for ...

2016-09-28 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2561

[FLINK-4703] RpcCompletenessTest: Add support for type arguments and 
subclasses

This is a prerequisite for merging #2540 which relies on the class 
hierarchy features.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4703

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2561


commit 442ee17bec78c96697101f5089465916f471478a
Author: Maximilian Michels 
Date:   2016-09-28T10:39:30Z

[FLINK-4703] RpcCompletenessTest: Add support for type arguments and 
subclasses




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4703) RpcCompletenessTest: Add support for type arguments and subclasses

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529239#comment-15529239
 ] 

ASF GitHub Bot commented on FLINK-4703:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2561

[FLINK-4703] RpcCompletenessTest: Add support for type arguments and 
subclasses

This is a prerequisite for merging #2540 which relies on the class 
hierarchy features.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4703

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2561


commit 442ee17bec78c96697101f5089465916f471478a
Author: Maximilian Michels 
Date:   2016-09-28T10:39:30Z

[FLINK-4703] RpcCompletenessTest: Add support for type arguments and 
subclasses




> RpcCompletenessTest: Add support for type arguments and subclasses
> --
>
> Key: FLINK-4703
> URL: https://issues.apache.org/jira/browse/FLINK-4703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.2.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The RpcCompletenessTest doesn't work for class hierarchies because it assumes 
> that the Gateway argument always is provided in the super class.
> Further, the RpcCompletenessTest should skip abstract classes and classes 
> which have type parameters bound to {{RpcGateway.class}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4632) when yarn nodemanager lost, flink hung

2016-09-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528833#comment-15528833
 ] 

刘喆 edited comment on FLINK-4632 at 9/28/16 11:00 AM:
-

I think it is related to checkpoint.
When I use checkpoint with 'exactly_once'  mode,  sub task may hung but other 
sub tasks running, after a long time, all tasks hung.  At the same time,  there 
is no more checkpoint producted.
When killing, maybe the checkpoint block other thread.  I use JobManager as 
checkpoint backend.  The checkpoint interval is 30 seconds.

I have 2 screenshot for it:
http://p1.bpimg.com/567571/eb9442e01ede0a24.png
http://p1.bpimg.com/567571/6502b8c89fc68229.png



Some log as below:
2016-09-27 09:42:59,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (369/500) 
(e60c3755b5461c181e29cd30400cd6b0) switched from DEPLOYING to RUNNING
2016-09-27 09:42:59,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (274/500) 
(f983db19c603a51027cf7031e19edb79) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,256 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (497/500) 
(92bde125c4eb920d32aa11b6514f4cf1) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,477 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (373/500) 
(a59fec5a8ce66518d9003e6a480e1854) switched from DEPLOYING to RUNNING
2016-09-27 09:44:05,867 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1474940645865
2016-09-27 09:44:41,782 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 35917 ms)
2016-09-27 09:49:05,865 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1474940945865
2016-09-27 09:50:05,866 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 2 
expired before completing.
2016-09-27 09:50:07,390 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:10,572 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:12,207 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2


and when I don't use checkpoint at all, it work well


was (Author: liuzhe):
I think it is related to checkpoint.
When I use checkpoint with 'exactly_once'  mode,  sub task may hung but other 
sub tasks running, after a long time, all tasks hung.  At the same time,  there 
is no more checkpoint producted.
When killing, maybe the checkpoint block other thread.  I use JobManager as 
checkpoint backend.  The checkpoint interval is 30 seconds.

I have 2 screenshot for it:
http://p1.bpimg.com/567571/eb9442e01ede0a24.png
http://p1.bpimg.com/567571/eb9442e01ede0a24.png



Some log as below:
2016-09-27 09:42:59,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (369/500) 
(e60c3755b5461c181e29cd30400cd6b0) switched from DEPLOYING to RUNNING
2016-09-27 09:42:59,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (274/500) 
(f983db19c603a51027cf7031e19edb79) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,256 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (497/500) 
(92bde125c4eb920d32aa11b6514f4cf1) switched from DEPLOYING to RUNNING
2016-09-27 09:43:00,477 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- send (373/500) 
(a59fec5a8ce66518d9003e6a480e1854) switched from DEPLOYING to RUNNING
2016-09-27 09:44:05,867 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1474940645865
2016-09-27 09:44:41,782 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 35917 ms)
2016-09-27 09:49:05,865 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1474940945865
2016-09-27 09:50:05,866 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 2 
expired before completing.
2016-09-27 09:50:07,390 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:10,572 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2
2016-09-27 09:50:12,207 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 2


and when I don't use checkpoint at all, it work well

> when yarn nodemanager lost,  flink hung
> ---
>
>  

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
@robert and @tzulitai What is your take on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529398#comment-15529398
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
@robert and @tzulitai What is your take on this?


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2477: [FLINK-4506] CsvOutputFormat defaults allowNullVal...

2016-09-28 Thread kirill-morozov-epam
Github user kirill-morozov-epam commented on a diff in the pull request:

https://github.com/apache/flink/pull/2477#discussion_r80901404
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class CsvOutputFormatTest {
+
+   private static final Path PATH = new Path("csv_output_test_file.csv");
+
+   @Test
+   public void testNullAllow() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.setAllowNullValues(true);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
+   final FileSystem fs = PATH.getFileSystem();
+   Assert.assertTrue(fs.exists(PATH));
+   FSDataInputStream inputStream = fs.open(PATH);
+   String csvContent = IOUtils.toString(inputStream, 
StandardCharsets.UTF_8);
+   Assert.assertEquals("One,,8\n", csvContent);
+   fs.delete(PATH, true);
+   }
+
+   @Test(expected = RuntimeException.class)
+   public void testNullDisallowOnDefault() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
--- End diff --

Fixed in 
https://github.com/apache/flink/pull/2477/commits/6d06d0e03e12ced8fe11e164488f1102682704d1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529420#comment-15529420
 ] 

ASF GitHub Bot commented on FLINK-4506:
---

Github user kirill-morozov-epam commented on a diff in the pull request:

https://github.com/apache/flink/pull/2477#discussion_r80901404
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class CsvOutputFormatTest {
+
+   private static final Path PATH = new Path("csv_output_test_file.csv");
+
+   @Test
+   public void testNullAllow() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.setAllowNullValues(true);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
+   final FileSystem fs = PATH.getFileSystem();
+   Assert.assertTrue(fs.exists(PATH));
+   FSDataInputStream inputStream = fs.open(PATH);
+   String csvContent = IOUtils.toString(inputStream, 
StandardCharsets.UTF_8);
+   Assert.assertEquals("One,,8\n", csvContent);
+   fs.delete(PATH, true);
+   }
+
+   @Test(expected = RuntimeException.class)
+   public void testNullDisallowOnDefault() throws Exception {
+   CsvOutputFormat> 
csvOutputFormat = new CsvOutputFormat>(PATH);
+   csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+   
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+   csvOutputFormat.open(0, 1);
+   csvOutputFormat.writeRecord(new Tuple3("One", null, 8));
+   csvOutputFormat.close();
--- End diff --

Fixed in 
https://github.com/apache/flink/pull/2477/commits/6d06d0e03e12ced8fe11e164488f1102682704d1


> CsvOutputFormat defaults allowNullValues to false, even though doc and 
> declaration says true
> 
>
> Key: FLINK-4506
> URL: https://issues.apache.org/jira/browse/FLINK-4506
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Documentation
>Reporter: Michael Wong
>Assignee: Kirill Morozov
>Priority: Minor
>
> In the constructor, it has this
> {code}
> this.allowNullValues = false;
> {code}
> But in the setAllowNullValues() method, the doc says the allowNullValues is 
> true by default. Also, in the declaration of allowNullValues, the value is 
> set to true. It probably makes the most sense to change the constructor.
> {code}
>   /**
>* Configures the format to either allow null values (writing an empty 
> field),
>* or to throw an exception when encountering a null field.
>* 
>* by default, null values are allowed.
>*
>* @param allowNulls Flag to indicate whether the output format should 
> accept null values.
>*/
>   public void setAllowNullValues(boolean allowNulls) {
>   this.allowNullValues = allowNulls;
>   }
> {code}



--
This message was sent by A

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-09-28 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2562

[FLINK-4691] [table] Add group-windows for streaming tables

This PR implements Tumble, Slide, Session group-windows for streaming 
tables as described in FLIP-11. It adds API, validation, logical 
representation, and runtime components.

Some additional comments:

I have not implemented the 'systemtime' keyword yet as this would cause 
more problems than it solves. Especially integrating it into the validation 
layer would be tricky. The resolution of those special fields happens within a 
WindowAggregate, however, the logical type of a window should already be known 
at this point. We are mixing logical operators and expressions which is not 
very nice. Furthermore, what happens in batch environment if 'systemtime' is 
used? It could also be a existing column but does not have to be one. That is 
not specified in the FLIP yet.

The aggregations are not very efficient yet. Currently this PR uses window 
functions that wrap the GroupReduce functions. We have to rework the 
aggregations first. Maybe we could use `WindowedStream#apply(R, 
FoldFunction, WindowFunction, TypeInformation)` which means 
that `R` has to be created in the translation phase.

The tests are mainly ITCases yet, we might want to change that to unit 
tests once we have means (like new test bases) to do that.

The website documentation is missing yet.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4691

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2562


commit edbfe00cb0fd7ea8362c90988eb0860eb9ce6078
Author: twalthr 
Date:   2016-08-25T07:19:53Z

[FLINK-4691] [table] Add group-windows for streaming tables




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529423#comment-15529423
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2562

[FLINK-4691] [table] Add group-windows for streaming tables

This PR implements Tumble, Slide, Session group-windows for streaming 
tables as described in FLIP-11. It adds API, validation, logical 
representation, and runtime components.

Some additional comments:

I have not implemented the 'systemtime' keyword yet as this would cause 
more problems than it solves. Especially integrating it into the validation 
layer would be tricky. The resolution of those special fields happens within a 
WindowAggregate, however, the logical type of a window should already be known 
at this point. We are mixing logical operators and expressions which is not 
very nice. Furthermore, what happens in batch environment if 'systemtime' is 
used? It could also be a existing column but does not have to be one. That is 
not specified in the FLIP yet.

The aggregations are not very efficient yet. Currently this PR uses window 
functions that wrap the GroupReduce functions. We have to rework the 
aggregations first. Maybe we could use `WindowedStream#apply(R, 
FoldFunction, WindowFunction, TypeInformation)` which means 
that `R` has to be created in the translation phase.

The tests are mainly ITCases yet, we might want to change that to unit 
tests once we have means (like new test bases) to do that.

The website documentation is missing yet.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4691

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2562


commit edbfe00cb0fd7ea8362c90988eb0860eb9ce6078
Author: twalthr 
Date:   2016-08-25T07:19:53Z

[FLINK-4691] [table] Add group-windows for streaming tables




> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
Just had a look at the API of `commitAsync`, and it seems like the 
committed offsets back to Kafka through this API (likewise for `commitSync`) 
need to be `lastProcessedMessageOffset + 1` 
([https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)](https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback))).

This mainly effects that when starting from group offsets in Kafka, 
`FlinkKafkaConsumer09` currently starts from the wrong offset. There's a 
separate JIRA for this bug: 
[FLINK-4618](https://issues.apache.org/jira/browse/FLINK-4618).

Another contributor had already picked up FLINK-4618, so I'd say it's ok to 
leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it 
gets merged after this PR.

Minus the above, this looks good to me. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529448#comment-15529448
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
Just had a look at the API of `commitAsync`, and it seems like the 
committed offsets back to Kafka through this API (likewise for `commitSync`) 
need to be `lastProcessedMessageOffset + 1` 
([https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)](https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback))).

This mainly effects that when starting from group offsets in Kafka, 
`FlinkKafkaConsumer09` currently starts from the wrong offset. There's a 
separate JIRA for this bug: 
[FLINK-4618](https://issues.apache.org/jira/browse/FLINK-4618).

Another contributor had already picked up FLINK-4618, so I'd say it's ok to 
leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it 
gets merged after this PR.

Minus the above, this looks good to me. +1


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80903481
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -86,6 +90,9 @@
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;
 
+   /** Flag indicating whether a commit of offsets to Kafka it currently 
happening */
--- End diff --

nit: it --> "is"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529450#comment-15529450
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80903481
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -86,6 +90,9 @@
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;
 
+   /** Flag indicating whether a commit of offsets to Kafka it currently 
happening */
--- End diff --

nit: it --> "is"?


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
Btw, just curious, does 0.8 Kafka connector have the same issue with sync 
committing? I haven't looked into the code for this, but just wondering if we 
need a ticket for 0.8 too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529455#comment-15529455
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
Btw, just curious, does 0.8 Kafka connector have the same issue with sync 
committing? I haven't looked into the code for this, but just wondering if we 
need a ticket for 0.8 too.


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4406) Implement job master registration at resource manager

2016-09-28 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-4406:
-

Assignee: Kurt Young  (was: zhuhaifeng)

> Implement job master registration at resource manager
> -
>
> Key: FLINK-4406
> URL: https://issues.apache.org/jira/browse/FLINK-4406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Wenlong Lyu
>Assignee: Kurt Young
>
> Job Master needs to register to Resource Manager when starting and then 
> watches leadership changes of RM, and trigger re-registration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4530) Generalize TaskExecutorToResourceManagerConnection to be reusable

2016-09-28 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529460#comment-15529460
 ] 

Kurt Young commented on FLINK-4530:
---

fixed by b955465ff2f230da0ecd195d7d0e8312fdf0578e

> Generalize TaskExecutorToResourceManagerConnection to be reusable
> -
>
> Key: FLINK-4530
> URL: https://issues.apache.org/jira/browse/FLINK-4530
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: zhuhaifeng
>Priority: Minor
>  Labels: flip-6
>
> The {{TaskExecutorToResourceManagerConnection}} can be more generalized to be 
> reusable across components. For example, the {{JobMaster}} requires a similar 
> connection if we assume that the {{JobMaster}} can be run independently of 
> the {{ResourceManager}}.
> Pulling out the strong dependency on the `TaskExecutor` and the 
> `ResourceManagerGateway` should be enough to make the 
> {{TaskExecutorToResourceManagerConnection}} reusable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4530) Generalize TaskExecutorToResourceManagerConnection to be reusable

2016-09-28 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young resolved FLINK-4530.
---
Resolution: Fixed

> Generalize TaskExecutorToResourceManagerConnection to be reusable
> -
>
> Key: FLINK-4530
> URL: https://issues.apache.org/jira/browse/FLINK-4530
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: zhuhaifeng
>Priority: Minor
>  Labels: flip-6
>
> The {{TaskExecutorToResourceManagerConnection}} can be more generalized to be 
> reusable across components. For example, the {{JobMaster}} requires a similar 
> connection if we assume that the {{JobMaster}} can be run independently of 
> the {{ResourceManager}}.
> Pulling out the strong dependency on the `TaskExecutor` and the 
> `ResourceManagerGateway` should be enough to make the 
> {{TaskExecutorToResourceManagerConnection}} reusable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529466#comment-15529466
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4618:


Hi [~melmoth],

I just had a look at the Kafka 0.9 API, and it seems like when committing 
offsets using the new `KafkaConsumer` API, the correct value to commit back to 
Kafka is `lastProcessedOffset + 1` 
(https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)).
I believe correcting this should fix the issue :) Let me know if you bump into 
any other problems.

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
> Fix For: 1.2.0, 1.1.3
>
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
@StephanEwen I think you've tagged the wrong Github ID for Robert ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529483#comment-15529483
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
@StephanEwen I think you've tagged the wrong Github ID for Robert ;)


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529466#comment-15529466
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4618 at 9/28/16 12:43 PM:
--

Hi [~melmoth],

I just had a look at the Kafka 0.9 API, and it seems like when committing 
offsets using the new `KafkaConsumer` API, the correct value to commit back to 
Kafka is {{lastProcessedOffset + 1}} 
(https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)).
I believe correcting this should fix the issue :) Let me know if you bump into 
any other problems.


was (Author: tzulitai):
Hi [~melmoth],

I just had a look at the Kafka 0.9 API, and it seems like when committing 
offsets using the new `KafkaConsumer` API, the correct value to commit back to 
Kafka is `lastProcessedOffset + 1` 
(https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)).
I believe correcting this should fix the issue :) Let me know if you bump into 
any other problems.

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
> Fix For: 1.2.0, 1.1.3
>
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Comple

[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...

2016-09-28 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2460
  
+1
I will shepherd this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80906814
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -301,4 +316,16 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
}
return result;
}
+
+   private class CommitCallback implements OffsetCommitCallback {
+
+   @Override
+   public void onComplete(Map 
offsets, Exception exception) {
+   commitInProgress = false;
+
+   if (exception != null) {
+   LOG.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints", exception);
--- End diff --

The exception message isn't included in the log warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4562) table examples make an divided module in flink-examples

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529490#comment-15529490
 ] 

ASF GitHub Bot commented on FLINK-4562:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2460
  
+1
I will shepherd this PR.


> table examples make an divided module in flink-examples
> ---
>
> Key: FLINK-4562
> URL: https://issues.apache.org/jira/browse/FLINK-4562
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> example code should't packaged in table module.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529493#comment-15529493
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80906814
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -301,4 +316,16 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
}
return result;
}
+
+   private class CommitCallback implements OffsetCommitCallback {
+
+   @Override
+   public void onComplete(Map 
offsets, Exception exception) {
+   commitInProgress = false;
+
+   if (exception != null) {
+   LOG.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints", exception);
--- End diff --

The exception message isn't included in the log warning.


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80907326
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
 
if (this.consumer != null) {
synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
+   if (!commitInProgress) {
+   commitInProgress = true;
+   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+   }
+   else {
+   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
--- End diff --

If the user sets a relatively short checkpoint interval, will this be 
flooding log?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529502#comment-15529502
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80907326
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
 
if (this.consumer != null) {
synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
+   if (!commitInProgress) {
+   commitInProgress = true;
+   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+   }
+   else {
+   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
--- End diff --

If the user sets a relatively short checkpoint interval, will this be 
flooding log?


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
Thanks @tzulitai for looking at this. I will leave the offset then as it is 
(fixed via followup) and 

The Kafka 0.8 connector needs a similar change. This here is encountered by 
a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 
0.8. Will also correct the issue tag ;-)

I have no good idea how to test this, though, so any thoughts there are 
welcome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529504#comment-15529504
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
Thanks @tzulitai for looking at this. I will leave the offset then as it is 
(fixed via followup) and 

The Kafka 0.8 connector needs a similar change. This here is encountered by 
a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 
0.8. Will also correct the issue tag ;-)

I have no good idea how to test this, though, so any thoughts there are 
welcome!


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80909904
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
 
if (this.consumer != null) {
synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
+   if (!commitInProgress) {
+   commitInProgress = true;
+   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+   }
+   else {
+   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
--- End diff --

Possibly yes. But on the other hand, this should be pretty visible if it 
happens.
I would expect that with proper options to participate in group checkpoint 
committing, most Flink jobs run without committing to Kafka/ZooKeeper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529532#comment-15529532
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80909904
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
 
if (this.consumer != null) {
synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
+   if (!commitInProgress) {
+   commitInProgress = true;
+   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+   }
+   else {
+   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
--- End diff --

Possibly yes. But on the other hand, this should be pretty visible if it 
happens.
I would expect that with proper options to participate in group checkpoint 
committing, most Flink jobs run without committing to Kafka/ZooKeeper.


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901203
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 ---
@@ -209,6 +209,11 @@ public void testWindowTriggerTimeAlignment() throws 
Exception {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
 
+   timerService.shutdownService();
--- End diff --

Why does this need to create and shut down a timer service every time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901399
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -109,9 +110,15 @@ public void run() {
public static DefaultTimeServiceProvider 
createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
return new DefaultTimeServiceProvider(new 
AsyncExceptionHandler() {
@Override
-   public void 
registerAsyncException(AsynchronousException exception) {
+   public void handleAsyncException(String message, 
Throwable exception) {
exception.printStackTrace();
}
}, executor, checkpointLock);
}
+
+   @VisibleForTesting
+   public static DefaultTimeServiceProvider createForTestingWithHandler(
--- End diff --

Is this the exact same code as the default constructor? Can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901242
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 ---
@@ -201,6 +201,11 @@ public void testWindowTriggerTimeAlignment() throws 
Exception {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
 
+   timerService.shutdownService();
--- End diff --

Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80902027
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
 ---
@@ -18,12 +18,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * Interface for reporting exceptions that are thrown in (possibly) a 
different thread.
+ * An interface marking a task as capable of handling exceptions thrown
+ * by different threads, other than the one executing the task itself.
  */
 public interface AsyncExceptionHandler {
 
/**
-* Registers the given exception.
+* Handles an exception thrown by another thread (e.g. a TriggerTask),
+* other than the one executing the main task.
 */
-   void registerAsyncException(AsynchronousException exception);
+   void handleAsyncException(String message, Throwable exception);
--- End diff --

This name change is good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529534#comment-15529534
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901242
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 ---
@@ -201,6 +201,11 @@ public void testWindowTriggerTimeAlignment() throws 
Exception {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
 
+   timerService.shutdownService();
--- End diff --

Same here


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80904818
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -224,7 +327,7 @@ public void testFilePathFiltering() throws Exception {
monitoringFunction.open(new Configuration());
monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
 
-   Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+   Assert.assertEquals(uniqFilesFound.size(), NO_OF_FILES);
--- End diff --

`assertEquals()` takes "expected" first and "actual" second.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529535#comment-15529535
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80904818
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -224,7 +327,7 @@ public void testFilePathFiltering() throws Exception {
monitoringFunction.open(new Configuration());
monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
 
-   Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+   Assert.assertEquals(uniqFilesFound.size(), NO_OF_FILES);
--- End diff --

`assertEquals()` takes "expected" first and "actual" second.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901983
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -99,7 +100,7 @@ public void run() {
target.trigger(timestamp);
} catch (Throwable t) {
TimerException asyncException = new 
TimerException(t);
--- End diff --

Do we need this extra level of exception wrapping?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529537#comment-15529537
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901203
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 ---
@@ -209,6 +209,11 @@ public void testWindowTriggerTimeAlignment() throws 
Exception {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
 
+   timerService.shutdownService();
--- End diff --

Why does this need to create and shut down a timer service every time?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529539#comment-15529539
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80902027
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
 ---
@@ -18,12 +18,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * Interface for reporting exceptions that are thrown in (possibly) a 
different thread.
+ * An interface marking a task as capable of handling exceptions thrown
+ * by different threads, other than the one executing the task itself.
  */
 public interface AsyncExceptionHandler {
 
/**
-* Registers the given exception.
+* Handles an exception thrown by another thread (e.g. a TriggerTask),
+* other than the one executing the main task.
 */
-   void registerAsyncException(AsynchronousException exception);
+   void handleAsyncException(String message, Throwable exception);
--- End diff --

This name change is good!


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529538#comment-15529538
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901983
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -99,7 +100,7 @@ public void run() {
target.trigger(timestamp);
} catch (Throwable t) {
TimerException asyncException = new 
TimerException(t);
--- End diff --

Do we need this extra level of exception wrapping?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529536#comment-15529536
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901399
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -109,9 +110,15 @@ public void run() {
public static DefaultTimeServiceProvider 
createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
return new DefaultTimeServiceProvider(new 
AsyncExceptionHandler() {
@Override
-   public void 
registerAsyncException(AsynchronousException exception) {
+   public void handleAsyncException(String message, 
Throwable exception) {
exception.printStackTrace();
}
}, executor, checkpointLock);
}
+
+   @VisibleForTesting
+   public static DefaultTimeServiceProvider createForTestingWithHandler(
--- End diff --

Is this the exact same code as the default constructor? Can it be removed?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2546
  
All in all some minor change requests, otherwise this seems good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529543#comment-15529543
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2546
  
All in all some minor change requests, otherwise this seems good.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2546
  
Actually, let me take a step back and understand a few things deeper, first.
Who actually generates the watermarks (in ingestion time)? The operator 
that creates the file splits, or the operator that reads the splits?

If the configuration is set to IngestionTime, will the operator that 
creates the file splits emit a final LongMax watermark? Is that one passing 
through by the split-reading operator? Is there a test that test that specific 
scenario? (I believe it was the initially reported bug).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529556#comment-15529556
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2546
  
Actually, let me take a step back and understand a few things deeper, first.
Who actually generates the watermarks (in ingestion time)? The operator 
that creates the file splits, or the operator that reads the splits?

If the configuration is set to IngestionTime, will the operator that 
creates the file splits emit a final LongMax watermark? Is that one passing 
through by the split-reading operator? Is there a test that test that specific 
scenario? (I believe it was the initially reported bug).


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4701) Unprotected access to cancelables in StreamTask

2016-09-28 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529571#comment-15529571
 ] 

Stephan Ewen commented on FLINK-4701:
-

I don't understand this. The constructor does not modify the set.

> Unprotected access to cancelables in StreamTask
> ---
>
> Key: FLINK-4701
> URL: https://issues.apache.org/jira/browse/FLINK-4701
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In performCheckpoint():
> {code}
> AsyncCheckpointRunnable 
> asyncCheckpointRunnable = new AsyncCheckpointRunnable(
> "checkpoint-" + checkpointId 
> + "-" + timestamp,
> this,
> cancelables,
> chainedStateHandles,
> keyGroupsStateHandleFuture,
> checkpointId,
> bytesBufferedAlignment,
> alignmentDurationNanos,
> syncDurationMillis,
> endOfSyncPart);
> synchronized (cancelables) {
> 
> cancelables.add(asyncCheckpointRunnable);
> }
> {code}
> Construction of AsyncCheckpointRunnable should be put under the synchronized 
> block of cancelables.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4701) Unprotected access to cancelables in StreamTask

2016-09-28 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529577#comment-15529577
 ] 

Ted Yu commented on FLINK-4701:
---

How about other threads which may modify cancelables ?

> Unprotected access to cancelables in StreamTask
> ---
>
> Key: FLINK-4701
> URL: https://issues.apache.org/jira/browse/FLINK-4701
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In performCheckpoint():
> {code}
> AsyncCheckpointRunnable 
> asyncCheckpointRunnable = new AsyncCheckpointRunnable(
> "checkpoint-" + checkpointId 
> + "-" + timestamp,
> this,
> cancelables,
> chainedStateHandles,
> keyGroupsStateHandleFuture,
> checkpointId,
> bytesBufferedAlignment,
> alignmentDurationNanos,
> syncDurationMillis,
> endOfSyncPart);
> synchronized (cancelables) {
> 
> cancelables.add(asyncCheckpointRunnable);
> }
> {code}
> Construction of AsyncCheckpointRunnable should be put under the synchronized 
> block of cancelables.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
@StephanEwen 
On a second look, I think the `commitSpecificOffsetsToKafka` method was 
designed to commit synchronously in the first place. `AbstractFetcher` holds a 
Map of all current pending offsets for committing by checkpointID, and on every 
`notifyCheckpointComplete` the offsets are removed from the Map before 
`commitSpecificOffsetsToKafka` is called.

So, for async committing, I think we need to remove cleaning up the offsets 
in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a 
new separate callback handle method in `AbstractFetcher`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529580#comment-15529580
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
@StephanEwen 
On a second look, I think the `commitSpecificOffsetsToKafka` method was 
designed to commit synchronously in the first place. `AbstractFetcher` holds a 
Map of all current pending offsets for committing by checkpointID, and on every 
`notifyCheckpointComplete` the offsets are removed from the Map before 
`commitSpecificOffsetsToKafka` is called.

So, for async committing, I think we need to remove cleaning up the offsets 
in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a 
new separate callback handle method in `AbstractFetcher`.


> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >