[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356732#comment-15356732
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2169
  
@fhueske Thank you for the detailed review! I've updated my code according 
to your comments.

I noticed that @wuchong is performing some type conversions in his 
INTERSECT implementation: 
https://github.com/apache/flink/pull/2159/files#diff-a6c2112ca46d26fcf49f1edba1c73f75R121

Should I do something similar in the EXCEPT case? If yes, does it mean that 
my test coverage is not sufficient and does not cover some particular case?


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-30 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2169
  
@fhueske Thank you for the detailed review! I've updated my code according 
to your comments.

I noticed that @wuchong is performing some type conversions in his 
INTERSECT implementation: 
https://github.com/apache/flink/pull/2159/files#diff-a6c2112ca46d26fcf49f1edba1c73f75R121

Should I do something similar in the EXCEPT case? If yes, does it mean that 
my test coverage is not sufficient and does not cover some particular case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
Hmm what about using the `JobID` to disambiguate jobs with the same name?

I cannot remember whether this peculiarity concerning the job naming is 
documented or not. If not, then it should be included in the metrics 
documentation PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356745#comment-15356745
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
Hmm what about using the `JobID` to disambiguate jobs with the same name?

I cannot remember whether this peculiarity concerning the job naming is 
documented or not. If not, then it should be included in the metrics 
documentation PR.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69093616
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }

[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356747#comment-15356747
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69093616
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass

[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r69093837
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter()` call. The order is always 
\\\.
+
+The system scope allows the reported name to contain contextual 
information like the name of job it w

[jira] [Commented] (FLINK-4116) Document metrics

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356750#comment-15356750
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r69093837
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");
+
+{% endhighlight %}
+

[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
What is this discussion about; changing the default format or finding a 
general way to avoid naming collisions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356758#comment-15356758
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
What is this discussion about; changing the default format or finding a 
general way to avoid naming collisions?


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356759#comment-15356759
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69094834
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   pu

[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69094834
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+

[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
For clarification: including the JobID is exactly what the user is supposed 
to do when he runs multiple jobs with the same name.

The current scope format is designed to provide the most readable name. As 
such it includes as few ID's and as many names as possible. Could we change it? 
Sure. But that's a separate issue and should in no way be part of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356781#comment-15356781
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
For clarification: including the JobID is exactly what the user is supposed 
to do when he runs multiple jobs with the same name.

The current scope format is designed to provide the most readable name. As 
such it includes as few ID's and as many names as possible. Could we change it? 
Sure. But that's a separate issue and should in no way be part of this PR.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2187: [FLINK-3675][yarn] improvements to library shipping

2016-06-30 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2187
  
I moved one existing environment variable `FLINK_CONF_DIR` to 
`ConfigConstants`. I added the `FLINK_LIB_DIR` environment variable to load a 
library folder (optional). I'm not sure whether we should add them to the 
documentation since they are set by the bash scripts and shouldn't be exposed 
to the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3675) YARN ship folder incosistent behavior

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356784#comment-15356784
 ] 

ASF GitHub Bot commented on FLINK-3675:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2187
  
I moved one existing environment variable `FLINK_CONF_DIR` to 
`ConfigConstants`. I added the `FLINK_LIB_DIR` environment variable to load a 
library folder (optional). I'm not sure whether we should add them to the 
documentation since they are set by the bash scripts and shouldn't be exposed 
to the user.


> YARN ship folder incosistent behavior
> -
>
> Key: FLINK-3675
> URL: https://issues.apache.org/jira/browse/FLINK-3675
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.0.0
>Reporter: Stefano Baghino
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.1.0
>
>
> After [some discussion on the user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-YARN-ship-folder-td5458.html]
>  it came up that the {{flink/lib}} folder is always supposed to be shipped to 
> the YARN cluster so that all the nodes have access to its contents.
> Currently however, the Flink long-running YARN session actually ships the 
> folder because it's explicitly specified in the {{yarn-session.sh}} script, 
> while running a single job on YARN does not automatically ship it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3675) YARN ship folder incosistent behavior

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356791#comment-15356791
 ] 

ASF GitHub Bot commented on FLINK-3675:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2187
  
All builds are passing in my branch: 
https://travis-ci.org/mxm/flink/builds/141155293

CC @rmetzger 


> YARN ship folder incosistent behavior
> -
>
> Key: FLINK-3675
> URL: https://issues.apache.org/jira/browse/FLINK-3675
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.0.0
>Reporter: Stefano Baghino
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.1.0
>
>
> After [some discussion on the user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-YARN-ship-folder-td5458.html]
>  it came up that the {{flink/lib}} folder is always supposed to be shipped to 
> the YARN cluster so that all the nodes have access to its contents.
> Currently however, the Flink long-running YARN session actually ships the 
> folder because it's explicitly specified in the {{yarn-session.sh}} script, 
> while running a single job on YARN does not automatically ship it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2187: [FLINK-3675][yarn] improvements to library shipping

2016-06-30 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2187
  
All builds are passing in my branch: 
https://travis-ci.org/mxm/flink/builds/141155293

CC @rmetzger 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

2016-06-30 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2174#discussion_r69100533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -334,9 +342,11 @@ public void run() {
this.pendingSplits.remove();
}
 
-   if (this.format instanceof CheckpointableInputFormat && 
this.isSplitOpen) {
-   S formatState = (S) 
((CheckpointableInputFormat) format).getCurrentState();
-   return new Tuple3<>(snapshot, currentSplit, 
currentSplit == null ? null : formatState);
+   if (this.format instanceof CheckpointableInputFormat && 
this.currentSplit != null) {
+   S formatState = this.isSplitOpen ?
+   (S) ((CheckpointableInputFormat) 
format).getCurrentState() :
+   restoredFormatState;
+   return new Tuple3<>(snapshot, currentSplit, 
formatState);
} else {
LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
--- End diff --

I think this log message is wrong for `this.currentSplit != null`. It leads 
to pollution of log files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4075) ContinuousFileProcessingCheckpointITCase failed on Travis

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356808#comment-15356808
 ] 

ASF GitHub Bot commented on FLINK-4075:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2174#discussion_r69100533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -334,9 +342,11 @@ public void run() {
this.pendingSplits.remove();
}
 
-   if (this.format instanceof CheckpointableInputFormat && 
this.isSplitOpen) {
-   S formatState = (S) 
((CheckpointableInputFormat) format).getCurrentState();
-   return new Tuple3<>(snapshot, currentSplit, 
currentSplit == null ? null : formatState);
+   if (this.format instanceof CheckpointableInputFormat && 
this.currentSplit != null) {
+   S formatState = this.isSplitOpen ?
+   (S) ((CheckpointableInputFormat) 
format).getCurrentState() :
+   restoredFormatState;
+   return new Tuple3<>(snapshot, currentSplit, 
formatState);
} else {
LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
--- End diff --

I think this log message is wrong for `this.currentSplit != null`. It leads 
to pollution of log files.


> ContinuousFileProcessingCheckpointITCase failed on Travis
> -
>
> Key: FLINK-4075
> URL: https://issues.apache.org/jira/browse/FLINK-4075
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The test case {{ContinuousFileProcessingCheckpointITCase}} failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748004/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69101854
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -132,9 +132,9 @@
 
 
 
-   private final Configuration config;
+   private Configuration config;
--- End diff --

Let's make it positional to prevent confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2149: [FLINK-4084] Add configDir parameter to CliFrontend and f...

2016-06-30 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2149
  
Thanks for the update. We're getting there :) Great work so far!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356819#comment-15356819
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69101922
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink ---
@@ -17,20 +17,41 @@
 # limitations under the License.
 

 
-target="$0"
 # For the case, the executable has been directly symlinked, figure out
 # the correct bin path by following its symlink up to an upper bound.
 # Note: we can't use the readlink utility here if we want to be POSIX
 # compatible.
-iteration=0
-while [ -L "$target" ]; do
-if [ "$iteration" -gt 100 ]; then
-echo "Cannot resolve path: You have a cyclic symlink in $target."
+followSymLink() {
+local iteration=0
+local bar=$1
+while [ -L "$bar" ]; do
+if [ "$iteration" -gt 100 ]; then
+echo "Cannot resolve path: You have a cyclic symlink in $bar."
+break
+fi
+ls=`ls -ld -- "$bar"`
+bar=`expr "$ls" : '.* -> \(.*\)$'`
+iteration=$((iteration + 1))
+done
+
+echo "$bar"
+}
+
+target=$(followSymLink "$0")
+
+#Search --configDir into the parameters and set it as FLINK_CONF_DIR
+args=("$@")
+for i in "${!args[@]}"; do
+if [ ${args[$i]} = "--configDir" ]; then
--- End diff --

You're right, it's not necessary.


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356817#comment-15356817
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69101854
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -132,9 +132,9 @@
 
 
 
-   private final Configuration config;
+   private Configuration config;
--- End diff --

Let's make it positional to prevent confusion.


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69101922
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink ---
@@ -17,20 +17,41 @@
 # limitations under the License.
 

 
-target="$0"
 # For the case, the executable has been directly symlinked, figure out
 # the correct bin path by following its symlink up to an upper bound.
 # Note: we can't use the readlink utility here if we want to be POSIX
 # compatible.
-iteration=0
-while [ -L "$target" ]; do
-if [ "$iteration" -gt 100 ]; then
-echo "Cannot resolve path: You have a cyclic symlink in $target."
+followSymLink() {
+local iteration=0
+local bar=$1
+while [ -L "$bar" ]; do
+if [ "$iteration" -gt 100 ]; then
+echo "Cannot resolve path: You have a cyclic symlink in $bar."
+break
+fi
+ls=`ls -ld -- "$bar"`
+bar=`expr "$ls" : '.* -> \(.*\)$'`
+iteration=$((iteration + 1))
+done
+
+echo "$bar"
+}
+
+target=$(followSymLink "$0")
+
+#Search --configDir into the parameters and set it as FLINK_CONF_DIR
+args=("$@")
+for i in "${!args[@]}"; do
+if [ ${args[$i]} = "--configDir" ]; then
--- End diff --

You're right, it's not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356820#comment-15356820
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2149
  
Thanks for the update. We're getting there :) Great work so far!


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356822#comment-15356822
 ] 

ASF GitHub Bot commented on FLINK-4118:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2176#discussion_r69102159
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -16,21 +18,23 @@
 # limitations under the License.
 

 
-jobmanager.rpc.address: %jobmanager%
-jobmanager.rpc.port: 6123
-jobmanager.heap.mb: 128
-
-taskmanager.rpc.port: 6121
-taskmanager.data.port: 6122
-taskmanager.heap.mb: 256
-taskmanager.numberOfTaskSlots: %nb_slots%
-
-parallelization.degree.default: %parallelism%
+# general configuration
+sed -i -e "s/taskmanager.numberOfTaskSlots: 
1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" 
$FLINK_HOME/conf/flink-conf.yaml
--- End diff --

This should only be executed on the JobManager since it will print 
`taskmanager_1  | sed: can't create temp file 
'/usr/local/flink/conf/flink-conf.yamlXX': Read-only file system` which 
might worry some users.


> The docker-flink image is outdated (1.0.2) and can be slimmed down
> --
>
> Key: FLINK-4118
> URL: https://issues.apache.org/jira/browse/FLINK-4118
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> This issue is to upgrade the docker image and polish some details in it (e.g. 
> it can be slimmed down if we remove some unneeded dependencies, and the code 
> can be polished).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2176: [FLINK-4118] The docker-flink image is outdated (1...

2016-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2176#discussion_r69102159
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -16,21 +18,23 @@
 # limitations under the License.
 

 
-jobmanager.rpc.address: %jobmanager%
-jobmanager.rpc.port: 6123
-jobmanager.heap.mb: 128
-
-taskmanager.rpc.port: 6121
-taskmanager.data.port: 6122
-taskmanager.heap.mb: 256
-taskmanager.numberOfTaskSlots: %nb_slots%
-
-parallelization.degree.default: %parallelism%
+# general configuration
+sed -i -e "s/taskmanager.numberOfTaskSlots: 
1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" 
$FLINK_HOME/conf/flink-conf.yaml
--- End diff --

This should only be executed on the JobManager since it will print 
`taskmanager_1  | sed: can't create temp file 
'/usr/local/flink/conf/flink-conf.yamlXX': Read-only file system` which 
might worry some users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

2016-06-30 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2174#discussion_r69102339
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -334,9 +342,11 @@ public void run() {
this.pendingSplits.remove();
}
 
-   if (this.format instanceof CheckpointableInputFormat && 
this.isSplitOpen) {
-   S formatState = (S) 
((CheckpointableInputFormat) format).getCurrentState();
-   return new Tuple3<>(snapshot, currentSplit, 
currentSplit == null ? null : formatState);
+   if (this.format instanceof CheckpointableInputFormat && 
this.currentSplit != null) {
+   S formatState = this.isSplitOpen ?
+   (S) ((CheckpointableInputFormat) 
format).getCurrentState() :
+   restoredFormatState;
+   return new Tuple3<>(snapshot, currentSplit, 
formatState);
} else {
LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
--- End diff --

Monitoring an empty directory looks like this:

```
11:36:42,704 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1467279402703
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,710 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,823 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 120 ms)
11:36:44,703 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1467279404703
11:36:44,705 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,707 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,710 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The curre

[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...

2016-06-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2176
  
Nice work! I didn't know how to use docker but I managed to set it up and 
use the new version on OS X without a problem. So it seems to work well, and 
the code is a lot simpler and the image is smaller.

LGTM minus the one comment I had about the config file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4075) ContinuousFileProcessingCheckpointITCase failed on Travis

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356825#comment-15356825
 ] 

ASF GitHub Bot commented on FLINK-4075:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2174#discussion_r69102339
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -334,9 +342,11 @@ public void run() {
this.pendingSplits.remove();
}
 
-   if (this.format instanceof CheckpointableInputFormat && 
this.isSplitOpen) {
-   S formatState = (S) 
((CheckpointableInputFormat) format).getCurrentState();
-   return new Tuple3<>(snapshot, currentSplit, 
currentSplit == null ? null : formatState);
+   if (this.format instanceof CheckpointableInputFormat && 
this.currentSplit != null) {
+   S formatState = this.isSplitOpen ?
+   (S) ((CheckpointableInputFormat) 
format).getCurrentState() :
+   restoredFormatState;
+   return new Tuple3<>(snapshot, currentSplit, 
formatState);
} else {
LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
--- End diff --

Monitoring an empty directory looks like this:

```
11:36:42,704 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1467279402703
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,709 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,710 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:42,823 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (in 120 ms)
11:36:44,703 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1467279404703
11:36:44,705 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,706 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - 
The format used is not checkpointable. The current input split will be 
restarted upon recovery.
11:36:44,707 INFO  
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperato

[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356824#comment-15356824
 ] 

ASF GitHub Bot commented on FLINK-4118:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2176
  
Nice work! I didn't know how to use docker but I managed to set it up and 
use the new version on OS X without a problem. So it seems to work well, and 
the code is a lot simpler and the image is smaller.

LGTM minus the one comment I had about the config file.


> The docker-flink image is outdated (1.0.2) and can be slimmed down
> --
>
> Key: FLINK-4118
> URL: https://issues.apache.org/jira/browse/FLINK-4118
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> This issue is to upgrade the docker image and polish some details in it (e.g. 
> it can be slimmed down if we remove some unneeded dependencies, and the code 
> can be polished).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4124) Unstable test WrapperSetupHelperTest.testCreateTopologyContext

2016-06-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-4124:
---

Assignee: Aljoscha Krettek

> Unstable test WrapperSetupHelperTest.testCreateTopologyContext
> --
>
> Key: FLINK-4124
> URL: https://issues.apache.org/jira/browse/FLINK-4124
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Aljoscha Krettek
>  Labels: test-stability
>
> Instances of the failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140554510/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558082/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558119/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4124) Unstable test WrapperSetupHelperTest.testCreateTopologyContext

2016-06-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4124:
---

Assignee: Chesnay Schepler  (was: Aljoscha Krettek)

> Unstable test WrapperSetupHelperTest.testCreateTopologyContext
> --
>
> Key: FLINK-4124
> URL: https://issues.apache.org/jira/browse/FLINK-4124
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>  Labels: test-stability
>
> Instances of the failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140554510/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558082/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558119/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4124) Unstable test WrapperSetupHelperTest.testCreateTopologyContext

2016-06-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-4124:

Assignee: Aljoscha Krettek  (was: Chesnay Schepler)

> Unstable test WrapperSetupHelperTest.testCreateTopologyContext
> --
>
> Key: FLINK-4124
> URL: https://issues.apache.org/jira/browse/FLINK-4124
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Aljoscha Krettek
>  Labels: test-stability
>
> Instances of the failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140554510/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558082/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558119/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2187: [FLINK-3675][yarn] improvements to library shipping

2016-06-30 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2187
  
Change looks good! +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3675) YARN ship folder incosistent behavior

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356849#comment-15356849
 ] 

ASF GitHub Bot commented on FLINK-3675:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2187
  
Change looks good! +1 to merge


> YARN ship folder incosistent behavior
> -
>
> Key: FLINK-3675
> URL: https://issues.apache.org/jira/browse/FLINK-3675
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.0.0
>Reporter: Stefano Baghino
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.1.0
>
>
> After [some discussion on the user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-YARN-ship-folder-td5458.html]
>  it came up that the {{flink/lib}} folder is always supposed to be shipped to 
> the YARN cluster so that all the nodes have access to its contents.
> Currently however, the Flink long-running YARN session actually ships the 
> folder because it's explicitly specified in the {{yarn-session.sh}} script, 
> while running a single job on YARN does not automatically ship it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3294) KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets()

2016-06-30 Thread Jens Kat (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356856#comment-15356856
 ] 

Jens Kat commented on FLINK-3294:
-

[~rmetzger] I've been looking into the Kafka09Fetcher and this one implements 
from AbstractFetcher: public void 
commitSpecificOffsetsToKafka(Map offsets)

Using the consumer, however the 08Fetcher implements this using 
zkHandler.writeOffsets(offsets)

It seems to me that the initial behavior already is in place? I'll look into it 
to get this working. 

> KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets()
> ---
>
> Key: FLINK-3294
> URL: https://issues.apache.org/jira/browse/FLINK-3294
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> Currently, the 0.8 consumer for Kafka is committing the offsets manually into 
> Zookeeper so that users can track the lag using external tools.
> The 0.8 consumer has a pluggable design, and this component is easily 
> pluggable.
> Since OffsetCommitRequest version=1 (supported in 0.8.2 or later), users can 
> choose between two offset commit modes:
> a) Let the broker commit into ZK (this is  what we are doing from the consumer
> b) Let the broker commit the offset into a special topic.
> By adding a different "OffsetHandler" backend, users can commit offsets from 
> the brokers (reducing the total number of ZK connections) or into the 
> broker's offset topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356868#comment-15356868
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69107628
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   pu

[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69107628
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+

[GitHub] flink issue #2131: [FLINK-3231][streaming-connectors] FlinkKinesisConsumer r...

2016-06-30 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger
Just pushed the changes to address comments + rebase on the exactly-once 
fix / user-agent fix.
Sorry, had some trouble with the rebasing and took more time than I 
expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356909#comment-15356909
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger
Just pushed the changes to address comments + rebase on the exactly-once 
fix / user-agent fix.
Sorry, had some trouble with the rebasing and took more time than I 
expected.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2186: [licenses] Remove not included dependency from LICENSE

2016-06-30 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2186
  
Is the documentation still usable offline?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

2016-06-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2053#discussion_r69121871
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
 ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm 
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to 
be clustered and the edge weights are the
--- End diff --

The matrix can be asymmetric as in Frey and Dueck's example of city flight 
times. Their paper also discusses sparse graphs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

2016-06-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2053#discussion_r69121905
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
 ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm 
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to 
be clustered and the edge weights are the
+ * similarities of these points among them.
+ *
+ * The output is a Dataset of Tuple2, where f0 is the point id and f1 is 
the exemplar, so the clusters will be the
+ * the Tuples grouped by f1
+ *
+ * @see http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf";>
+ */
+
+@SuppressWarnings("serial")
+public class AffinityPropagation implements 
GraphAlgorithm>> {
+
+   private static Integer maxIterations;
+   private static float damping;
+   private static float epsilon;
+
+   /**
+* Creates a new AffinityPropagation instance algorithm instance.
+*
+* @param maxIterations The maximum number of iterations to run
+* @param damping Damping factor.
+* @param epsilon Epsilon factor. Do not send message to a neighbor if 
the new message
+* has not changed more than epsilon.
+*/
+   public AffinityPropagation(Integer maxIterations, float damping, float 
epsilon) {
+   this.maxIterations = maxIterations;
+   this.damping = damping;
+   this.epsilon = epsilon;
+   }
+
+   @Override
+   public DataSet> run(Graph 
input) throws Exception {
+
+   // Create E and I AP vertices
+   DataSet> verticesWithAllInNeighbors 
=
+   input.groupReduceOnEdges(new InitAPVertex(), 
EdgeDirection.IN);
+
+   List> APvertices = 
verticesWithAllInNeighbors.collect();
+
+   // Create E and I AP edges. Could this be done with some gelly 
functionality?
+   List> APedges = new ArrayList<>();
+
+   for(int i = 1; i < input.numberOfVertices() + 1; i++){
+   for(int j = 1; j < input.numberOfVertices() + 1; j++){
+   APedges.add(new Edge<>(i * 10L, j * 10L + 1, 
NullValue.getInstance()));
+   }
+   }
+
+   DataSet> APEdgesDS = 
input.getContext().fromCollection(APedges);
+   DataSet> APVerticesDS = 
input.getContext().fromCollection(APvertices);
+
+   ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
+   parameters.registerAggregator("convergedAggregator", new 
LongSumAggrega

[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357002#comment-15357002
 ] 

ASF GitHub Bot commented on FLINK-1707:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2053#discussion_r69121871
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
 ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm 
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to 
be clustered and the edge weights are the
--- End diff --

The matrix can be asymmetric as in Frey and Dueck's example of city flight 
times. Their paper also discusses sparse graphs.


> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357003#comment-15357003
 ] 

ASF GitHub Bot commented on FLINK-1707:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2053#discussion_r69121905
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
 ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm 
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to 
be clustered and the edge weights are the
+ * similarities of these points among them.
+ *
+ * The output is a Dataset of Tuple2, where f0 is the point id and f1 is 
the exemplar, so the clusters will be the
+ * the Tuples grouped by f1
+ *
+ * @see http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf";>
+ */
+
+@SuppressWarnings("serial")
+public class AffinityPropagation implements 
GraphAlgorithm>> {
+
+   private static Integer maxIterations;
+   private static float damping;
+   private static float epsilon;
+
+   /**
+* Creates a new AffinityPropagation instance algorithm instance.
+*
+* @param maxIterations The maximum number of iterations to run
+* @param damping Damping factor.
+* @param epsilon Epsilon factor. Do not send message to a neighbor if 
the new message
+* has not changed more than epsilon.
+*/
+   public AffinityPropagation(Integer maxIterations, float damping, float 
epsilon) {
+   this.maxIterations = maxIterations;
+   this.damping = damping;
+   this.epsilon = epsilon;
+   }
+
+   @Override
+   public DataSet> run(Graph 
input) throws Exception {
+
+   // Create E and I AP vertices
+   DataSet> verticesWithAllInNeighbors 
=
+   input.groupReduceOnEdges(new InitAPVertex(), 
EdgeDirection.IN);
+
+   List> APvertices = 
verticesWithAllInNeighbors.collect();
+
+   // Create E and I AP edges. Could this be done with some gelly 
functionality?
+   List> APedges = new ArrayList<>();
+
+   for(int i = 1; i < input.numberOfVertices() + 1; i++){
+   for(int j = 1; j < input.numberOfVertices() + 1; j++){
+   APedges.add(new Edge<>(i * 10L, j * 10L + 1, 
NullValue.getInstance()));
+   }
+   }
+
+   DataSet> APEdgesDS = 
input.getContext().fromCollection(APedges);
+   D

[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123278
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java 
---
@@ -459,4 +476,14 @@ public static InfoOptions parseInfoCommand(String[] 
args) throws CliArgsExceptio
}
}
 
+   public static MainOptions parseMainCommand(String[] args) throws 
CliArgsException {
+   try {
+   DefaultParser parser = new DefaultParser();
+   CommandLine line = parser.parse(MAIN_OPTIONS, args, 
false);
--- End diff --

You're parsing the entire arguments here. What if the user jar has 
`--configDir` as parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357013#comment-15357013
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123278
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java 
---
@@ -459,4 +476,14 @@ public static InfoOptions parseInfoCommand(String[] 
args) throws CliArgsExceptio
}
}
 
+   public static MainOptions parseMainCommand(String[] args) throws 
CliArgsException {
+   try {
+   DefaultParser parser = new DefaultParser();
+   CommandLine line = parser.parse(MAIN_OPTIONS, args, 
false);
--- End diff --

You're parsing the entire arguments here. What if the user jar has 
`--configDir` as parameter?


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123376
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendMainTest.java ---
@@ -0,0 +1,35 @@
+package org.apache.flink.client;
+
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.MainOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.junit.Assert.assertEquals;
+
+public class CliFrontendMainTest {
+
+
+   @BeforeClass
+   public static void init() {
+   CliFrontendTestUtils.pipeSystemOutToNull();
+   CliFrontendTestUtils.clearGlobalConfiguration();
+   }
+
+   @Test
+   public void testMain() throws CliArgsException, FileNotFoundException, 
MalformedURLException {
+   // test configure configDir
+   {
+   String[] parameters = {"--configDir", 
"expectedConfigDirectory", getTestJarPath()};
+   MainOptions options = 
CliFrontendParser.parseMainCommand(parameters);
+   assertEquals("expectedConfigDirectory", 
options.getConfigDir());
+   }
+   }
--- End diff --

We need more tests here to assure the parameter is correctly parsed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123415
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendMainTest.java ---
@@ -0,0 +1,35 @@
+package org.apache.flink.client;
+
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.MainOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.junit.Assert.assertEquals;
+
+public class CliFrontendMainTest {
+
+
+   @BeforeClass
+   public static void init() {
+   CliFrontendTestUtils.pipeSystemOutToNull();
+   CliFrontendTestUtils.clearGlobalConfiguration();
+   }
+
+   @Test
+   public void testMain() throws CliArgsException, FileNotFoundException, 
MalformedURLException {
+   // test configure configDir
+   {
+   String[] parameters = {"--configDir", 
"expectedConfigDirectory", getTestJarPath()};
--- End diff --

This is not a valid invocation of the CliFrontend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123467
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendMainTest.java ---
@@ -0,0 +1,35 @@
+package org.apache.flink.client;
+
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.MainOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.junit.Assert.assertEquals;
+
+public class CliFrontendMainTest {
+
+
+   @BeforeClass
+   public static void init() {
+   CliFrontendTestUtils.pipeSystemOutToNull();
+   CliFrontendTestUtils.clearGlobalConfiguration();
+   }
+
+   @Test
+   public void testMain() throws CliArgsException, FileNotFoundException, 
MalformedURLException {
+   // test configure configDir
+   {
+   String[] parameters = {"--configDir", 
"expectedConfigDirectory", getTestJarPath()};
--- End diff --

i.e. there is no "action" before the jar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357017#comment-15357017
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123415
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendMainTest.java ---
@@ -0,0 +1,35 @@
+package org.apache.flink.client;
+
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.MainOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.junit.Assert.assertEquals;
+
+public class CliFrontendMainTest {
+
+
+   @BeforeClass
+   public static void init() {
+   CliFrontendTestUtils.pipeSystemOutToNull();
+   CliFrontendTestUtils.clearGlobalConfiguration();
+   }
+
+   @Test
+   public void testMain() throws CliArgsException, FileNotFoundException, 
MalformedURLException {
+   // test configure configDir
+   {
+   String[] parameters = {"--configDir", 
"expectedConfigDirectory", getTestJarPath()};
--- End diff --

This is not a valid invocation of the CliFrontend.


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357014#comment-15357014
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123376
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendMainTest.java ---
@@ -0,0 +1,35 @@
+package org.apache.flink.client;
+
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.MainOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.junit.Assert.assertEquals;
+
+public class CliFrontendMainTest {
+
+
+   @BeforeClass
+   public static void init() {
+   CliFrontendTestUtils.pipeSystemOutToNull();
+   CliFrontendTestUtils.clearGlobalConfiguration();
+   }
+
+   @Test
+   public void testMain() throws CliArgsException, FileNotFoundException, 
MalformedURLException {
+   // test configure configDir
+   {
+   String[] parameters = {"--configDir", 
"expectedConfigDirectory", getTestJarPath()};
+   MainOptions options = 
CliFrontendParser.parseMainCommand(parameters);
+   assertEquals("expectedConfigDirectory", 
options.getConfigDir());
+   }
+   }
--- End diff --

We need more tests here to assure the parameter is correctly parsed.


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357019#comment-15357019
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r69123467
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendMainTest.java ---
@@ -0,0 +1,35 @@
+package org.apache.flink.client;
+
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.MainOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.junit.Assert.assertEquals;
+
+public class CliFrontendMainTest {
+
+
+   @BeforeClass
+   public static void init() {
+   CliFrontendTestUtils.pipeSystemOutToNull();
+   CliFrontendTestUtils.clearGlobalConfiguration();
+   }
+
+   @Test
+   public void testMain() throws CliArgsException, FileNotFoundException, 
MalformedURLException {
+   // test configure configDir
+   {
+   String[] parameters = {"--configDir", 
"expectedConfigDirectory", getTestJarPath()};
--- End diff --

i.e. there is no "action" before the jar.


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r69123978
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
--- End diff --

I'm ok with that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357022#comment-15357022
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r69123978
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
--- End diff --

I'm ok with that.


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r69124072
  
--- Diff: docs/setup/config.md ---
@@ -230,8 +240,8 @@ definition. This scheme is used **ONLY** if no other 
scheme is specified (explic
 
 ## YARN
 
-- `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to 
remove from containers started by YARN. When a user requests a certain amount 
of memory for each TaskManager container (for example 4 GB), we can not pass 
this amount as the maximum heap space for the JVM (`-Xmx` argument) because the 
JVM is also allocating memory outside the heap. YARN is very strict with 
killing containers which are using more memory than requested. Therefore, we 
remove a 15% of the memory from the requested heap as a safety margin.
-- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut 
off the requested heap size.
+- `container.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space 
to remove from containers started by YARN. When a user requests a certain 
amount of memory for each TaskManager container (for example 4 GB), we can not 
pass this amount as the maximum heap space for the JVM (`-Xmx` argument) 
because the JVM is also allocating memory outside the heap. YARN is very strict 
with killing containers which are using more memory than requested. Therefore, 
we remove a 15% of the memory from the requested heap as a safety margin.
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357024#comment-15357024
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r69124072
  
--- Diff: docs/setup/config.md ---
@@ -230,8 +240,8 @@ definition. This scheme is used **ONLY** if no other 
scheme is specified (explic
 
 ## YARN
 
-- `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to 
remove from containers started by YARN. When a user requests a certain amount 
of memory for each TaskManager container (for example 4 GB), we can not pass 
this amount as the maximum heap space for the JVM (`-Xmx` argument) because the 
JVM is also allocating memory outside the heap. YARN is very strict with 
killing containers which are using more memory than requested. Therefore, we 
remove a 15% of the memory from the requested heap as a safety margin.
-- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut 
off the requested heap size.
+- `container.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space 
to remove from containers started by YARN. When a user requests a certain 
amount of memory for each TaskManager container (for example 4 GB), we can not 
pass this amount as the maximum heap space for the JVM (`-Xmx` argument) 
because the JVM is also allocating memory outside the heap. YARN is very strict 
with killing containers which are using more memory than requested. Therefore, 
we remove a 15% of the memory from the requested heap as a safety margin.
--- End diff --

+1


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4134) EventTimeSessionWindows trigger for empty windows when dropping late events

2016-06-30 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4134:
-

 Summary: EventTimeSessionWindows trigger for empty windows when 
dropping late events
 Key: FLINK-4134
 URL: https://issues.apache.org/jira/browse/FLINK-4134
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.0.3
Reporter: Stefan Richter


It seems like EventTimeSessionWindows sometimes trigger for empty windows. The 
behavior is observed in connection with dropping late events:

{code}
stream
 .keyBy("sessionKey")
 .window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
 .allowedLateness(Time.milliseconds(0))
 .apply(new ValidatingWindowFunction())
 .print();
{code}


I wrote a generator that generates events for several parallel sessions and 
that allows to reproduce the error. For now, I can share this generator 
privately for debugging purposes, but my plan is to use the generator as basis 
for an integration test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4134) EventTimeSessionWindows trigger for empty windows when dropping late events

2016-06-30 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4134:
-

Assignee: Stefan Richter

> EventTimeSessionWindows trigger for empty windows when dropping late events
> ---
>
> Key: FLINK-4134
> URL: https://issues.apache.org/jira/browse/FLINK-4134
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> It seems like EventTimeSessionWindows sometimes trigger for empty windows. 
> The behavior is observed in connection with dropping late events:
> {code}
> stream
>  .keyBy("sessionKey")
>  .window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
>  .allowedLateness(Time.milliseconds(0))
>  .apply(new ValidatingWindowFunction())
>  .print();
> {code}
> I wrote a generator that generates events for several parallel sessions and 
> that allows to reproduce the error. For now, I can share this generator 
> privately for debugging purposes, but my plan is to use the generator as 
> basis for an integration test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
Sure, I'll give a full review now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357030#comment-15357030
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
Sure, I'll give a full review now.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4135) Replace ChecksumHashCode as GraphAnalytic

2016-06-30 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4135:
-

 Summary: Replace ChecksumHashCode as GraphAnalytic
 Key: FLINK-4135
 URL: https://issues.apache.org/jira/browse/FLINK-4135
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 1.1.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Trivial
 Fix For: 1.1.0


Create a {{GraphAnalytic}} to replace {{GraphUtils.checksumHashCode}} as there 
is nothing special about this computation and we can remove this function from 
the API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3822) Document checksum functions

2016-06-30 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-3822.
-
Resolution: Won't Fix

This utility function will be replaced by a {{GraphAnalytic}}.

> Document checksum functions
> ---
>
> Key: FLINK-3822
> URL: https://issues.apache.org/jira/browse/FLINK-3822
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>
> Add site documentation for {{GraphUtils.checksumHashCode}}. Note potential 
> issues with {{hashCode}} methods which are linearly combinable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax 
I think the failing `JMXReporterTest.testJMXAvailability` was just hotfixed 
with this commit yesterday:

https://github.com/apache/flink/commit/53630da01bcbfe05eda90869b1198b4e1c554a86


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357039#comment-15357039
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax 
I think the failing `JMXReporterTest.testJMXAvailability` was just hotfixed 
with this commit yesterday:

https://github.com/apache/flink/commit/53630da01bcbfe05eda90869b1198b4e1c554a86


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
Alright. I just reported my observation while trying this PR out. 

You're right that it would blow up the scope of this PR. I guess the 
easiest solution would be to properly document the behaviour when a user 
submits jobs with the same name and to tell him that he should rather use the 
job id in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357040#comment-15357040
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
Alright. I just reported my observation while trying this PR out. 

You're right that it would blow up the scope of this PR. I guess the 
easiest solution would be to properly document the behaviour when a user 
submits jobs with the same name and to tell him that he should rather use the 
job id in this case.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69127489
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }

[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357042#comment-15357042
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r69127489
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass

[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

2016-06-30 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2108
  
Thank you for your review @tillrohrmann and @zentol . I tried addressing 
all your concerns.
Please let me know what you think about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357044#comment-15357044
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2108
  
Thank you for your review @tillrohrmann and @zentol . I tried addressing 
all your concerns.
Please let me know what you think about it.


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4135) Replace ChecksumHashCode as GraphAnalytic

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357051#comment-15357051
 ] 

ASF GitHub Bot commented on FLINK-4135:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2188

[FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic

Adds a GraphAnalytic to replace the checksumHashCode Java and Scala utility 
functions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4135_replace_checksumhashcode_as_graphanalytic

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2188


commit 7c7fa84259b846e8a5df7e54ae497995fac34ab0
Author: Greg Hogan 
Date:   2016-06-30T13:02:47Z

[FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic

Adds a GraphAnalytic to replace the checksumHashCode Java and Scala
utility functions.




> Replace ChecksumHashCode as GraphAnalytic
> -
>
> Key: FLINK-4135
> URL: https://issues.apache.org/jira/browse/FLINK-4135
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.1.0
>
>
> Create a {{GraphAnalytic}} to replace {{GraphUtils.checksumHashCode}} as 
> there is nothing special about this computation and we can remove this 
> function from the API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2188: [FLINK-4135] [gelly] Replace ChecksumHashCode as G...

2016-06-30 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2188

[FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic

Adds a GraphAnalytic to replace the checksumHashCode Java and Scala utility 
functions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4135_replace_checksumhashcode_as_graphanalytic

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2188


commit 7c7fa84259b846e8a5df7e54ae497995fac34ab0
Author: Greg Hogan 
Date:   2016-06-30T13:02:47Z

[FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic

Adds a GraphAnalytic to replace the checksumHashCode Java and Scala
utility functions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357067#comment-15357067
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69129641
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using t

[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...

2016-06-30 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69129641
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r69130106
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
@@ -107,15 +115,16 @@ public void run() {
threadB.start();
// this should block:
producer.snapshotState(0, 0);
-   // once all pending callbacks are confirmed, we can set this 
marker to true
-   markOne.set(true);
-   for(int i = 0; i < 99; i++) {
-   producer.invoke("msg-" + i);
+   synchronized (threadA) {
+   threadA.notifyAll(); // just in case, to let the test 
fail faster
}
-   // wait at most one second
-   threadB.join(800L);
-   Assert.assertFalse("Thread A reached this point too fast", 
threadB.isAlive());
-   if(runnableError.f0 != null) {
+
+   Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+   while (deadline.hasTimeLeft() && threadB.isAlive()) {
+   threadB.join(500);
+   }
+   Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
+   if (runnableError.f0 != null) {
runnableError.f0.printStackTrace();
Assert.fail("Error from thread B: " + runnableError.f0 
);
--- End diff --

Printing the stack trace to stdout is imo not so good. The problem is that 
the stack trace will be intermingled with the rest of the testing log output. I 
think it's better to simply rethrow the `Throwable` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357068#comment-15357068
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r69130106
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
@@ -107,15 +115,16 @@ public void run() {
threadB.start();
// this should block:
producer.snapshotState(0, 0);
-   // once all pending callbacks are confirmed, we can set this 
marker to true
-   markOne.set(true);
-   for(int i = 0; i < 99; i++) {
-   producer.invoke("msg-" + i);
+   synchronized (threadA) {
+   threadA.notifyAll(); // just in case, to let the test 
fail faster
}
-   // wait at most one second
-   threadB.join(800L);
-   Assert.assertFalse("Thread A reached this point too fast", 
threadB.isAlive());
-   if(runnableError.f0 != null) {
+
+   Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+   while (deadline.hasTimeLeft() && threadB.isAlive()) {
+   threadB.join(500);
+   }
+   Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
+   if (runnableError.f0 != null) {
runnableError.f0.printStackTrace();
Assert.fail("Error from thread B: " + runnableError.f0 
);
--- End diff --

Printing the stack trace to stdout is imo not so good. The problem is that 
the stack trace will be intermingled with the rest of the testing log output. I 
think it's better to simply rethrow the `Throwable` here.


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...

2016-06-30 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69130874
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -68,10 +97,30 @@
//  Default configuration values
// 

 
-   public static final int DEFAULT_STREAM_DESCRIBE_RETRY_TIMES = 3;
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+   public static final double 
DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
+
+   public static final double 
DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
 
-   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF = 1000L;
+   public static final double 
DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-   public static final int DEFAULT_SHARD_RECORDS_PER_GET = 100;
+   public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 
1L;
--- End diff --

I'm using 10s for default discovery interval here. I tested it and the 
originally suggested 30s seemed a bit too long as a default, IMHO. Can change 
it back to 30s if you think it's more appropriate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357072#comment-15357072
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69130874
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -68,10 +97,30 @@
//  Default configuration values
// 

 
-   public static final int DEFAULT_STREAM_DESCRIBE_RETRY_TIMES = 3;
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+   public static final double 
DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
+
+   public static final double 
DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
 
-   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF = 1000L;
+   public static final double 
DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-   public static final int DEFAULT_SHARD_RECORDS_PER_GET = 100;
+   public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 
1L;
--- End diff --

I'm using 10s for default discovery interval here. I tested it and the 
originally suggested 30s seemed a bit too long as a default, IMHO. Can change 
it back to 30s if you think it's more appropriate.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4075) ContinuousFileProcessingCheckpointITCase failed on Travis

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357081#comment-15357081
 ] 

ASF GitHub Bot commented on FLINK-4075:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2174


> ContinuousFileProcessingCheckpointITCase failed on Travis
> -
>
> Key: FLINK-4075
> URL: https://issues.apache.org/jira/browse/FLINK-4075
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The test case {{ContinuousFileProcessingCheckpointITCase}} failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748004/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4124) Unstable test WrapperSetupHelperTest.testCreateTopologyContext

2016-06-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-4124.
-
Resolution: Fixed

Fixed in 
https://github.com/apache/flink/commit/a9733a9ae0b063ab51b5683f1d80c6e10ae2aefe

> Unstable test WrapperSetupHelperTest.testCreateTopologyContext
> --
>
> Key: FLINK-4124
> URL: https://issues.apache.org/jira/browse/FLINK-4124
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Aljoscha Krettek
>  Labels: test-stability
>
> Instances of the failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140554510/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558082/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558119/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

2016-06-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2174


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4075) ContinuousFileProcessingCheckpointITCase failed on Travis

2016-06-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-4075.
-
Resolution: Fixed

Fixed in 
https://github.com/apache/flink/commit/bd273a8f435b222eb67840fb39b854ec9ef8602f

> ContinuousFileProcessingCheckpointITCase failed on Travis
> -
>
> Key: FLINK-4075
> URL: https://issues.apache.org/jira/browse/FLINK-4075
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The test case {{ContinuousFileProcessingCheckpointITCase}} failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748004/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69132723
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class FlinkJedisPoolConfig implements Serializable {
--- End diff --

The 3 configuration classes seems to have quite a bit of duplicate code, as 
well as inconsistency between them.
Ex. All 3 have getters / setters for `timeout`, `maxTotal`, `maxIdle`, 
etc., while the getter / setter method for `timeout` is sometimes called 
`getSoTimeout` and sometimes `getTimeout`.

Can we implement the shared settings in a base class, and let the 3 types 
of configuration extend from that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357089#comment-15357089
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69132723
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class FlinkJedisPoolConfig implements Serializable {
--- End diff --

The 3 configuration classes seems to have quite a bit of duplicate code, as 
well as inconsistency between them.
Ex. All 3 have getters / setters for `timeout`, `maxTotal`, `maxIdle`, 
etc., while the getter / setter method for `timeout` is sometimes called 
`getSoTimeout` and sometimes `getTimeout`.

Can we implement the shared settings in a base class, and let the 3 types 
of configuration extend from that?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69134099
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(FlinkJedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link 
#RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create 
connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if Redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(FlinkJedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisDataTypeDescription getDataTypeDescription() {
+ * return new RedisDataTypeDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+* {@code additionalKey} used as hash name for {@link 
RedisDataType#HASH}
+* For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+* {@code additionalKey} used as set name for {@link 
RedisDataType#SO

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357099#comment-15357099
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69134099
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(FlinkJedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link 
#RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create 
connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if Redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(FlinkJedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisDataTypeDescription getDataTypeDescription() {
+ * return new RedisDataTypeDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69134232
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(FlinkJedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link 
#RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create 
connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if Redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(FlinkJedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisDataTypeDescription getDataTypeDescription() {
+ * return new RedisDataTypeDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+* {@code additionalKey} used as hash name for {@link 
RedisDataType#HASH}
+* For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+* {@code additionalKey} used as set name for {@link 
RedisDataType#SO

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357102#comment-15357102
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69134232
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(FlinkJedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link 
#RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create 
connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if Redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(FlinkJedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisDataTypeDescription getDataTypeDescription() {
+ * return new RedisDataTypeDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.

[GitHub] flink issue #2124: [FLINK-3647] Change StreamSource to use Processing-Time C...

2016-06-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2124
  
@kl0u could you please close this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3647) Change StreamSource to use Processing-Time Clock Service

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357103#comment-15357103
 ] 

ASF GitHub Bot commented on FLINK-3647:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2124
  
@kl0u could you please close this?


> Change StreamSource to use Processing-Time Clock Service
> 
>
> Key: FLINK-3647
> URL: https://issues.apache.org/jira/browse/FLINK-3647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> Currently, the {{StreamSource.AutomaticWatermarkContext}} has it's own timer 
> service. This should be changed to use the Clock service introduced in 
> FLINK-3646 to make watermark emission testable by providing a custom Clock.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69135245
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be 
mapped to redis type.
+ *Example:
+ *{@code
+ *private static class RedisTestMapper implements 
RedisMapper> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return data.f0;
+ *}
+ *public String getValueFromData(Tuple2 data) {
+ *return data.f1;
+ *}
+ *}
+ *}
+ *
+ * @param  The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper extends Function, Serializable {
+
+   /**
+* Returns descriptor which defines data type.
+*
+* @return data type descriptor
+*/
+   RedisCommandDescription getDataTypeDescription();
--- End diff --

Now that this is renamed to `RedisCommandDescription`, should 
`getDataTypeDescription` be renamed accordingly too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357105#comment-15357105
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69135245
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be 
mapped to redis type.
+ *Example:
+ *{@code
+ *private static class RedisTestMapper implements 
RedisMapper> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return data.f0;
+ *}
+ *public String getValueFromData(Tuple2 data) {
+ *return data.f1;
+ *}
+ *}
+ *}
+ *
+ * @param  The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper extends Function, Serializable {
+
+   /**
+* Returns descriptor which defines data type.
+*
+* @return data type descriptor
+*/
+   RedisCommandDescription getDataTypeDescription();
--- End diff --

Now that this is renamed to `RedisCommandDescription`, should 
`getDataTypeDescription` be renamed accordingly too?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...

2016-06-30 Thread iemejia
Github user iemejia commented on the issue:

https://github.com/apache/flink/pull/2176
  
Nice, I just fixed as you suggested. I have three questions:

1. This container is based on the Java JRE (to keep it small), Does Flink 
in any part do some magic that requires a full JDK (like live recompiles) ? If 
no, I think this is almost perfect now.

2. Are you aware of any other flink dependency that uses any native OS 
library (I ask this since this because if it is the case it must be added to 
the container, I did this for snappy because I found the issue while testing a 
Beam pipeline, but I don't know if there are others).

3. In the docker image I left supervisor because I didn't find an easy way 
to start flink in normal mode, the scripts to start both taskmanager and 
jobmanager go into daemon mode immediately, is there something that can be done 
to change this (this will reduce the image in 40 more MB), but well I can work 
on that for a different PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357106#comment-15357106
 ] 

ASF GitHub Bot commented on FLINK-4118:
---

Github user iemejia commented on the issue:

https://github.com/apache/flink/pull/2176
  
Nice, I just fixed as you suggested. I have three questions:

1. This container is based on the Java JRE (to keep it small), Does Flink 
in any part do some magic that requires a full JDK (like live recompiles) ? If 
no, I think this is almost perfect now.

2. Are you aware of any other flink dependency that uses any native OS 
library (I ask this since this because if it is the case it must be added to 
the container, I did this for snappy because I found the issue while testing a 
Beam pipeline, but I don't know if there are others).

3. In the docker image I left supervisor because I didn't find an easy way 
to start flink in normal mode, the scripts to start both taskmanager and 
jobmanager go into daemon mode immediately, is there something that can be done 
to change this (this will reduce the image in 40 more MB), but well I can work 
on that for a different PR.



> The docker-flink image is outdated (1.0.2) and can be slimmed down
> --
>
> Key: FLINK-4118
> URL: https://issues.apache.org/jira/browse/FLINK-4118
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> This issue is to upgrade the docker image and polish some details in it (e.g. 
> it can be slimmed down if we remove some unneeded dependencies, and the code 
> can be polished).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r69136271
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
@@ -35,69 +35,77 @@
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.Assert;
 import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Test ensuring that the producer is not dropping buffered records
  */
 @SuppressWarnings("unchecked")
 public class AtLeastOnceProducerTest {
 
-   @Test
+   // we set a timeout because the test will not finish if the logic is 
broken
+   @Test(timeout=5000)
public void testAtLeastOnceProducer() throws Exception {
runTest(true);
}
 
// This test ensures that the actual test fails if the flushing is 
disabled
-   @Test(expected = AssertionError.class)
+   @Test(expected = AssertionError.class, timeout=5000)
public void ensureTestFails() throws Exception {
runTest(false);
}
 
private void runTest(boolean flushOnCheckpoint) throws Exception {
Properties props = new Properties();
-   final TestingKafkaProducer producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props);
+   final OneShotLatch snapshottingFinished = new OneShotLatch();
+   final TestingKafkaProducer producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props,
+   snapshottingFinished);
producer.setFlushOnCheckpoint(flushOnCheckpoint);
producer.setRuntimeContext(new MockRuntimeContext(0, 1));
 
producer.open(new Configuration());
 
-   for(int i = 0; i < 100; i++) {
+   for (int i = 0; i < 100; i++) {
producer.invoke("msg-" + i);
}
// start a thread confirming all pending records
final Tuple1 runnableError = new Tuple1<>(null);
-   final AtomicBoolean markOne = new AtomicBoolean(false);
+   final Thread threadA = Thread.currentThread();
+
Runnable confirmer = new Runnable() {
@Override
public void run() {
try {
MockProducer mp = 
producer.getProducerInstance();
List pending = 
mp.getPending();
 
-   // we ensure thread A is locked and 
didn't reach markOne
-   // give thread A some time to really 
reach the snapshot state
-   Thread.sleep(500);
-   if(markOne.get()) {
-   Assert.fail("Snapshot was 
confirmed even though messages " +
-   "were still in 
the buffer");
+   // we need to find out if the 
snapshot() method blocks forever
+   // this is not possible. If snapshot() 
is running, it will
+   // start removing elements from the 
pending list.
+   synchronized (threadA) {
+   threadA.wait(500L);
}
+   // we now check that no records have 
been confirmed yet
Assert.assertEquals(100, 
pending.size());
+   Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
+   
snapshottingFinished.hasTriggered());
 
// now confirm all checkpoints
-   for(Callback c: pending) {
+   for (Callback c: pending) {
c.onCompletion(null, null);
}
pending.clear();
-   // wait for the snapshotState() method 
to return
-

[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357110#comment-15357110
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r69136271
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
@@ -35,69 +35,77 @@
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.Assert;
 import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Test ensuring that the producer is not dropping buffered records
  */
 @SuppressWarnings("unchecked")
 public class AtLeastOnceProducerTest {
 
-   @Test
+   // we set a timeout because the test will not finish if the logic is 
broken
+   @Test(timeout=5000)
public void testAtLeastOnceProducer() throws Exception {
runTest(true);
}
 
// This test ensures that the actual test fails if the flushing is 
disabled
-   @Test(expected = AssertionError.class)
+   @Test(expected = AssertionError.class, timeout=5000)
public void ensureTestFails() throws Exception {
runTest(false);
}
 
private void runTest(boolean flushOnCheckpoint) throws Exception {
Properties props = new Properties();
-   final TestingKafkaProducer producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props);
+   final OneShotLatch snapshottingFinished = new OneShotLatch();
+   final TestingKafkaProducer producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props,
+   snapshottingFinished);
producer.setFlushOnCheckpoint(flushOnCheckpoint);
producer.setRuntimeContext(new MockRuntimeContext(0, 1));
 
producer.open(new Configuration());
 
-   for(int i = 0; i < 100; i++) {
+   for (int i = 0; i < 100; i++) {
producer.invoke("msg-" + i);
}
// start a thread confirming all pending records
final Tuple1 runnableError = new Tuple1<>(null);
-   final AtomicBoolean markOne = new AtomicBoolean(false);
+   final Thread threadA = Thread.currentThread();
+
Runnable confirmer = new Runnable() {
@Override
public void run() {
try {
MockProducer mp = 
producer.getProducerInstance();
List pending = 
mp.getPending();
 
-   // we ensure thread A is locked and 
didn't reach markOne
-   // give thread A some time to really 
reach the snapshot state
-   Thread.sleep(500);
-   if(markOne.get()) {
-   Assert.fail("Snapshot was 
confirmed even though messages " +
-   "were still in 
the buffer");
+   // we need to find out if the 
snapshot() method blocks forever
+   // this is not possible. If snapshot() 
is running, it will
+   // start removing elements from the 
pending list.
+   synchronized (threadA) {
+   threadA.wait(500L);
}
+   // we now check that no records have 
been confirmed yet
Assert.assertEquals(100, 
pending.size());
+   Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
+   
snapshottingFinished.hasTriggered());
 
// now confirm all checkpoints
-   for(Callback c: pending) {
+   for (Callback c: pending) {
  

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r69136462
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
@@ -107,15 +115,16 @@ public void run() {
threadB.start();
// this should block:
producer.snapshotState(0, 0);
-   // once all pending callbacks are confirmed, we can set this 
marker to true
-   markOne.set(true);
-   for(int i = 0; i < 99; i++) {
-   producer.invoke("msg-" + i);
+   synchronized (threadA) {
+   threadA.notifyAll(); // just in case, to let the test 
fail faster
}
-   // wait at most one second
-   threadB.join(800L);
-   Assert.assertFalse("Thread A reached this point too fast", 
threadB.isAlive());
-   if(runnableError.f0 != null) {
+
--- End diff --

I would insert here an assertion which checks that the number of 
pendingRecords is `0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >