[jira] [Commented] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()

2016-08-15 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420674#comment-15420674
 ] 

Ufuk Celebi commented on FLINK-4387:


Temporarily ignored the test in 79cc30 (master).

> Instability in KvStateClientTest.testClientServerIntegration()
> --
>
> Key: FLINK-4387
> URL: https://issues.apache.org/jira/browse/FLINK-4387
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>  Labels: test-stability
>
> According to this log: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt
> the {{KvStateClientTest}} didn't complete.
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() 
> [0x7fb2bcb3b000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
>   - locked <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32)
>   at 
> org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185)
>   at 
> org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}
> and
> {code}
> Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError
>   at 
> io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226)
>   at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2362: [FLINK-4385] [table] Union on Timestamp fields does not w...

2016-08-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2362
  
@wuchong Thanks for the fix! Will merge...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4385) Union on Timestamp fields does not work

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420682#comment-15420682
 ] 

ASF GitHub Bot commented on FLINK-4385:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2362
  
@wuchong Thanks for the fix! Will merge...



> Union on Timestamp fields does not work
> ---
>
> Key: FLINK-4385
> URL: https://issues.apache.org/jira/browse/FLINK-4385
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> The following does not work:
> {code}
> public static class SDF {
>   public Timestamp t = Timestamp.valueOf("1990-10-10 12:10:10");
> }
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> DataSet dataSet1 = env.fromElements(new SDF());
> DataSet dataSet2 = env.fromElements(new SDF());
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
> tableEnv.registerDataSet( "table0", dataSet1 );
> tableEnv.registerDataSet( "table1", dataSet2 );
> Table table = tableEnv.sql( "select t from table0 union select t from table1" 
> );
> DataSet d = tableEnv.toDataSet(table, Row.class);
> d.print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2362: [FLINK-4385] [table] Union on Timestamp fields doe...

2016-08-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2362


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4385) Union on Timestamp fields does not work

2016-08-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4385.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 83c4b9707dd24425391bd5759f12878ad2f19175.

> Union on Timestamp fields does not work
> ---
>
> Key: FLINK-4385
> URL: https://issues.apache.org/jira/browse/FLINK-4385
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
> Fix For: 1.2.0
>
>
> The following does not work:
> {code}
> public static class SDF {
>   public Timestamp t = Timestamp.valueOf("1990-10-10 12:10:10");
> }
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> DataSet dataSet1 = env.fromElements(new SDF());
> DataSet dataSet2 = env.fromElements(new SDF());
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
> tableEnv.registerDataSet( "table0", dataSet1 );
> tableEnv.registerDataSet( "table1", dataSet2 );
> Table table = tableEnv.sql( "select t from table0 union select t from table1" 
> );
> DataSet d = tableEnv.toDataSet(table, Row.class);
> d.print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4385) Union on Timestamp fields does not work

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420693#comment-15420693
 ] 

ASF GitHub Bot commented on FLINK-4385:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2362


> Union on Timestamp fields does not work
> ---
>
> Key: FLINK-4385
> URL: https://issues.apache.org/jira/browse/FLINK-4385
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
> Fix For: 1.2.0
>
>
> The following does not work:
> {code}
> public static class SDF {
>   public Timestamp t = Timestamp.valueOf("1990-10-10 12:10:10");
> }
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> DataSet dataSet1 = env.fromElements(new SDF());
> DataSet dataSet2 = env.fromElements(new SDF());
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
> tableEnv.registerDataSet( "table0", dataSet1 );
> tableEnv.registerDataSet( "table1", dataSet2 );
> Table table = tableEnv.sql( "select t from table0 union select t from table1" 
> );
> DataSet d = tableEnv.toDataSet(table, Row.class);
> d.print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4348) implement communication from ResourceManager to TaskManager

2016-08-15 Thread zhangjing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangjing reassigned FLINK-4348:


Assignee: zhangjing

> implement communication from ResourceManager to TaskManager
> ---
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: zhangjing
>
> There are mainly 3 logics initiated from RM to TM:
> * Heartbeat, RM use heartbeat to sync with TM's slot status
> * SlotRequest, when RM decides to assign slot to JM, should first try to send 
> request to TM for slot. TM can either accept or reject this request.
> * FailureNotify, in some corner cases, TM will be marked as invalid by 
> cluster manager master(e.g. yarn master), but TM itself does not realize. RM 
> should send failure notify to TM and TM can terminate itself



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses

2016-08-15 Thread Dominik Bruhn (JIRA)
Dominik Bruhn created FLINK-4394:


 Summary: RMQSource: The QueueName is not accessible to subclasses
 Key: FLINK-4394
 URL: https://issues.apache.org/jira/browse/FLINK-4394
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.1.1
Reporter: Dominik Bruhn


In version 1.1.0 we made the RMQSource extensible so that subclasses can 
configure how they want their queue for RabbitMQ/AMQP create. The subclasses 
can override 

{code}
protected void setupQueue() throws IOException {
channel.queueDeclare(queueName, true, false, false, null);
}
{code}


The problem is, that the queueName property is private. So when override the 
setupQueue parameter, you don't know what actual queueName was provided. A 
simple change of the queueName property to protected fixes this.

PR will follow



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2373: [FLINK-4394] RMQSource: QueueName accessible for s...

2016-08-15 Thread theomega
GitHub user theomega opened a pull request:

https://github.com/apache/flink/pull/2373

[FLINK-4394] RMQSource: QueueName accessible for subclasses

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

The queueName is needed if the subclasses override `setupQueue`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/theomega/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2373


commit 04d6bf46e8d8684494522cd0c92ad462922d0af4
Author: Dominik 
Date:   2016-08-15T07:41:57Z

RMQSource: QueueName accessible for subclasses

The queueName is needed if the subclasses override `setupQueue`.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420704#comment-15420704
 ] 

ASF GitHub Bot commented on FLINK-4394:
---

GitHub user theomega opened a pull request:

https://github.com/apache/flink/pull/2373

[FLINK-4394] RMQSource: QueueName accessible for subclasses

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

The queueName is needed if the subclasses override `setupQueue`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/theomega/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2373


commit 04d6bf46e8d8684494522cd0c92ad462922d0af4
Author: Dominik 
Date:   2016-08-15T07:41:57Z

RMQSource: QueueName accessible for subclasses

The queueName is needed if the subclasses override `setupQueue`.




> RMQSource: The QueueName is not accessible to subclasses
> 
>
> Key: FLINK-4394
> URL: https://issues.apache.org/jira/browse/FLINK-4394
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.1
>Reporter: Dominik Bruhn
>
> In version 1.1.0 we made the RMQSource extensible so that subclasses can 
> configure how they want their queue for RabbitMQ/AMQP create. The subclasses 
> can override 
> {code}
>   protected void setupQueue() throws IOException {
>   channel.queueDeclare(queueName, true, false, false, null);
>   }
> {code}
> The problem is, that the queueName property is private. So when override the 
> setupQueue parameter, you don't know what actual queueName was provided. A 
> simple change of the queueName property to protected fixes this.
> PR will follow



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()

2016-08-15 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420708#comment-15420708
 ] 

Robert Metzger commented on FLINK-4387:
---

I saw this assertion error also in another test
{code}
Exception in thread "globalEventExecutor-1-1" java.lang.AssertionError
at 
io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83)
at 
io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110)
at 
io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95)
at 
io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
{code}
https://s3.amazonaws.com/archive.travis-ci.org/jobs/152241331/log.txt

But I wonder why this error occurs now. Did we update the netty version 
recently?

> Instability in KvStateClientTest.testClientServerIntegration()
> --
>
> Key: FLINK-4387
> URL: https://issues.apache.org/jira/browse/FLINK-4387
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>  Labels: test-stability
>
> According to this log: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt
> the {{KvStateClientTest}} didn't complete.
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() 
> [0x7fb2bcb3b000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
>   - locked <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32)
>   at 
> org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185)
>   at 
> org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}
> and
> {code}
> Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError
>   at 
> io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226)
>   at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4395) Eager processing of late arrivals in CEP operator

2016-08-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4395:


 Summary: Eager processing of late arrivals in CEP operator
 Key: FLINK-4395
 URL: https://issues.apache.org/jira/browse/FLINK-4395
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Reporter: Till Rohrmann
Priority: Minor


At the moment elements are only processed after the CEP operator has received a 
watermark larger than the elements (in EventTime mode). In case of late 
arrivals this means that the late elements are not processed until the next 
watermark has arrived.

In order to decrease the latency for this scenario, I propose to eagerly 
process late arrivals in the CEP operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-15 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420743#comment-15420743
 ] 

Chesnay Schepler commented on FLINK-3950:
-

The idea behind the Meter view is to reduce the overhead on the processing 
thread.

For example, when using a DropWizard meter for every event you update 4 
LongAdders, and regularly calculate the 3 different rates, all with thread-safe 
classes. This is all done by the same thread that does the actual processing. 
The overhead when doing this thousands of times per second adds up.

A meter view on the other hand works behind the scenes. A user would register a 
counter and a view, but would only use the counter (==very cheap). The view is 
updated by a background thread in regular intervals, calculating rates based on 
the counter's values.

However, having a standalone meter type is quite useful. For example in regards 
to compatibility; there will be users who already use dropwizard meters and 
can't just replace them with our metrics.

As such i would propose the following

[~ivan.mushketyk] implements the standalone Meter. This includes the Meter 
interface (I'm not quite sure which rates it should calculate right now), a 
wrapper for DropWizard meters, (similar to histograms), proper propagation 
through the metric system and handling by reporters.

The View implementation will be done by me. I'm starting to think that Views 
and Meters will be separate things in the end, so there shouldn't be much 
overlap.

Would that be alright with you [~ivan.mushketyk]?

> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses

2016-08-15 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420750#comment-15420750
 ] 

Robert Metzger commented on FLINK-4394:
---

Thank you for opening a pull request. I gave you contributor permissions in our 
JIRA so that you can assign JIRAs to yourself in the future.
I assigned this one to you.

> RMQSource: The QueueName is not accessible to subclasses
> 
>
> Key: FLINK-4394
> URL: https://issues.apache.org/jira/browse/FLINK-4394
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.1
>Reporter: Dominik Bruhn
>
> In version 1.1.0 we made the RMQSource extensible so that subclasses can 
> configure how they want their queue for RabbitMQ/AMQP create. The subclasses 
> can override 
> {code}
>   protected void setupQueue() throws IOException {
>   channel.queueDeclare(queueName, true, false, false, null);
>   }
> {code}
> The problem is, that the queueName property is private. So when override the 
> setupQueue parameter, you don't know what actual queueName was provided. A 
> simple change of the queueName property to protected fixes this.
> PR will follow



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses

2016-08-15 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4394:
--
Assignee: Dominik Bruhn

> RMQSource: The QueueName is not accessible to subclasses
> 
>
> Key: FLINK-4394
> URL: https://issues.apache.org/jira/browse/FLINK-4394
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.1
>Reporter: Dominik Bruhn
>Assignee: Dominik Bruhn
>
> In version 1.1.0 we made the RMQSource extensible so that subclasses can 
> configure how they want their queue for RabbitMQ/AMQP create. The subclasses 
> can override 
> {code}
>   protected void setupQueue() throws IOException {
>   channel.queueDeclare(queueName, true, false, false, null);
>   }
> {code}
> The problem is, that the queueName property is private. So when override the 
> setupQueue parameter, you don't know what actual queueName was provided. A 
> simple change of the queueName property to protected fixes this.
> PR will follow



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2373: [FLINK-4394] RMQSource: QueueName accessible for subclass...

2016-08-15 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2373
  
I'm going to merge the change once travis gives green light.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420752#comment-15420752
 ] 

ASF GitHub Bot commented on FLINK-4394:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2373
  
I'm going to merge the change once travis gives green light.


> RMQSource: The QueueName is not accessible to subclasses
> 
>
> Key: FLINK-4394
> URL: https://issues.apache.org/jira/browse/FLINK-4394
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.1
>Reporter: Dominik Bruhn
>Assignee: Dominik Bruhn
>
> In version 1.1.0 we made the RMQSource extensible so that subclasses can 
> configure how they want their queue for RabbitMQ/AMQP create. The subclasses 
> can override 
> {code}
>   protected void setupQueue() throws IOException {
>   channel.queueDeclare(queueName, true, false, false, null);
>   }
> {code}
> The problem is, that the queueName property is private. So when override the 
> setupQueue parameter, you don't know what actual queueName was provided. A 
> simple change of the queueName property to protected fixes this.
> PR will follow



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2297: [FLINK-4081] [core] [table] FieldParsers should support e...

2016-08-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2297
  
@StephanEwen I renamed `EMPTY_STRING` to `EMPTY_COLUMN`. All parsers that 
have a "format" (like Double, Boolean, Integer etc.) return `-1` and set 
`EMPTY_COLUMN`. The `StringParser` returns the String but sets `EMPTY_COLUMN` 
in case no quoting could be found.

So in `..,12,,xyz,..` the column `,,` will always be set as `EMPTY_COLUMN` 
consistently across all parsers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4081) FieldParsers should support empty strings

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420758#comment-15420758
 ] 

ASF GitHub Bot commented on FLINK-4081:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2297
  
@StephanEwen I renamed `EMPTY_STRING` to `EMPTY_COLUMN`. All parsers that 
have a "format" (like Double, Boolean, Integer etc.) return `-1` and set 
`EMPTY_COLUMN`. The `StringParser` returns the String but sets `EMPTY_COLUMN` 
in case no quoting could be found.

So in `..,12,,xyz,..` the column `,,` will always be set as `EMPTY_COLUMN` 
consistently across all parsers.


> FieldParsers should support empty strings
> -
>
> Key: FLINK-4081
> URL: https://issues.apache.org/jira/browse/FLINK-4081
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Flavio Pompermaier
>Assignee: Timo Walther
>  Labels: csvparser, table-api
>
> In order to parse CSV files using the new Table API that converts rows to Row 
> objects (that support null values), FiledParser implementations should 
> support emptry strings setting the parser state to 
> ParseErrorState.EMPTY_STRING (for example FloatParser and DoubleParser 
> doesn't respect this constraint)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737756
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -25,33 +25,42 @@
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.log4j.Logger;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
 /**
- * Invocation handler to be used with a {@link AkkaRpcActor}. The 
invocation handler wraps the
- * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
+ * Invocation handler to be used with an {@link AkkaRpcActor}. The 
invocation handler wraps the
+ * rpc in a {@link LocalRpcInvocation} message and then sends it to the 
{@link AkkaRpcActor} where it is
  * executed.
  */
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+   private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
+
private final ActorRef rpcServer;
--- End diff --

Good point, will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4383) Check parameters for serializability before sending a remote RpcInvocation message

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420766#comment-15420766
 ] 

ASF GitHub Bot commented on FLINK-4383:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737803
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Remote rpc invocation message which is used when the actor 
communication is remote and, thus, the
+ * message has to be serialized.
+ * 
+ * In order to fail fast and report an appropriate error message to the 
user, the method name, the
+ * parameter types and the arguments are eagerly serialized. In case the 
the invocation call
+ * contains a non-serializable object, then an {@link IOException} is 
thrown.
+ */
+public class RemoteRpcInvocation implements RpcInvocation, Serializable {
+   private static final long serialVersionUID = 6179354390913843809L;
+
+   // Serialized invocation data
+   private final SerializedValue 
serializedMethodInvocation;
+
+   // Transient field which is lazily initialized upon first access to the 
invocation data
+   private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
+
+   public  RemoteRpcInvocation(
+   final String methodName,
+   final Class[] parameterTypes,
+   final Object[] args) throws IOException {
+
+   serializedMethodInvocation = new SerializedValue<>(new 
RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
--- End diff --

You're right. Will add it.


> Check parameters for serializability before sending a remote RpcInvocation 
> message
> --
>
> Key: FLINK-4383
> URL: https://issues.apache.org/jira/browse/FLINK-4383
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Before sending a remote {{RpcInvocation}} message we should check that the 
> rpc arguments are serializable. If not we should eagerly fail with an 
> appropriate exception message.
> If we don't do this, then Akka will silently fail serializing the message 
> without telling the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737803
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Remote rpc invocation message which is used when the actor 
communication is remote and, thus, the
+ * message has to be serialized.
+ * 
+ * In order to fail fast and report an appropriate error message to the 
user, the method name, the
+ * parameter types and the arguments are eagerly serialized. In case the 
the invocation call
+ * contains a non-serializable object, then an {@link IOException} is 
thrown.
+ */
+public class RemoteRpcInvocation implements RpcInvocation, Serializable {
+   private static final long serialVersionUID = 6179354390913843809L;
+
+   // Serialized invocation data
+   private final SerializedValue 
serializedMethodInvocation;
+
+   // Transient field which is lazily initialized upon first access to the 
invocation data
+   private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
+
+   public  RemoteRpcInvocation(
+   final String methodName,
+   final Class[] parameterTypes,
+   final Object[] args) throws IOException {
+
+   serializedMethodInvocation = new SerializedValue<>(new 
RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
--- End diff --

You're right. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4383) Check parameters for serializability before sending a remote RpcInvocation message

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420763#comment-15420763
 ] 

ASF GitHub Bot commented on FLINK-4383:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737756
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -25,33 +25,42 @@
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.log4j.Logger;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
 /**
- * Invocation handler to be used with a {@link AkkaRpcActor}. The 
invocation handler wraps the
- * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
+ * Invocation handler to be used with an {@link AkkaRpcActor}. The 
invocation handler wraps the
+ * rpc in a {@link LocalRpcInvocation} message and then sends it to the 
{@link AkkaRpcActor} where it is
  * executed.
  */
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+   private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
+
private final ActorRef rpcServer;
--- End diff --

Good point, will change it.


> Check parameters for serializability before sending a remote RpcInvocation 
> message
> --
>
> Key: FLINK-4383
> URL: https://issues.apache.org/jira/browse/FLINK-4383
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Before sending a remote {{RpcInvocation}} message we should check that the 
> rpc arguments are serializable. If not we should eagerly fail with an 
> appropriate exception message.
> If we don't do this, then Akka will silently fail serializing the message 
> without telling the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4383) Check parameters for serializability before sending a remote RpcInvocation message

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420770#comment-15420770
 ] 

ASF GitHub Bot commented on FLINK-4383:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737963
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -25,33 +25,42 @@
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.log4j.Logger;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
 /**
- * Invocation handler to be used with a {@link AkkaRpcActor}. The 
invocation handler wraps the
- * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
+ * Invocation handler to be used with an {@link AkkaRpcActor}. The 
invocation handler wraps the
+ * rpc in a {@link LocalRpcInvocation} message and then sends it to the 
{@link AkkaRpcActor} where it is
  * executed.
  */
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+   private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
+
private final ActorRef rpcServer;
 
// default timeout for asks
private final Timeout timeout;
 
-   AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) {
+   private final long maximumFramesize;
+
+   AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout, long 
maximumFramesize) {
--- End diff --

`isLocalActorRef` should actually be quite cheap since it simply checks 
whether the host option of the actor's address is defined or not. But, anyway, 
it makes sense to check it once and store the result. Will add the change.


> Check parameters for serializability before sending a remote RpcInvocation 
> message
> --
>
> Key: FLINK-4383
> URL: https://issues.apache.org/jira/browse/FLINK-4383
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Before sending a remote {{RpcInvocation}} message we should check that the 
> rpc arguments are serializable. If not we should eagerly fail with an 
> appropriate exception message.
> If we don't do this, then Akka will silently fail serializing the message 
> without telling the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737963
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -25,33 +25,42 @@
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.log4j.Logger;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
 /**
- * Invocation handler to be used with a {@link AkkaRpcActor}. The 
invocation handler wraps the
- * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
+ * Invocation handler to be used with an {@link AkkaRpcActor}. The 
invocation handler wraps the
+ * rpc in a {@link LocalRpcInvocation} message and then sends it to the 
{@link AkkaRpcActor} where it is
  * executed.
  */
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+   private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
+
private final ActorRef rpcServer;
 
// default timeout for asks
private final Timeout timeout;
 
-   AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) {
+   private final long maximumFramesize;
+
+   AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout, long 
maximumFramesize) {
--- End diff --

`isLocalActorRef` should actually be quite cheap since it simply checks 
whether the host option of the actor's address is defined or not. But, anyway, 
it makes sense to check it once and store the result. Will add the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2365
  
Thanks for the review @StephanEwen. I've addressed your comments. Once 
Travis gives green light, I'll merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4383) Check parameters for serializability before sending a remote RpcInvocation message

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420798#comment-15420798
 ] 

ASF GitHub Bot commented on FLINK-4383:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2365
  
Thanks for the review @StephanEwen. I've addressed your comments. Once 
Travis gives green light, I'll merge it.


> Check parameters for serializability before sending a remote RpcInvocation 
> message
> --
>
> Key: FLINK-4383
> URL: https://issues.apache.org/jira/browse/FLINK-4383
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Before sending a remote {{RpcInvocation}} message we should check that the 
> rpc arguments are serializable. If not we should eagerly fail with an 
> appropriate exception message.
> If we don't do this, then Akka will silently fail serializing the message 
> without telling the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2288
  
The associated issue for this can be found here: 
https://issues.apache.org/jira/browse/FLINK-4228.

+1 to the points Stephan raised in his earlier comment.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420812#comment-15420812
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2288
  
The associated issue for this can be found here: 
https://issues.apache.org/jira/browse/FLINK-4228.

+1 to the points Stephan raised in his earlier comment.




> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74742684
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
@@ -46,12 +47,13 @@ public void run() {
 
FileSystem fs = 
FileSystem.get(remotePath, hadoopConf);
 
-   fs.copyFromLocalFile(new 
Path(localPath.getAbsolutePath()),
-   new Path(remotePath));
+   copyFromLocalFile(fs, localPath, 
checkInitialDirectory(fs,localPath,remotePath));
--- End diff --

Whitespace between arguments missing, use `fs, localPath, remotePath`, 
please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74743009
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
@@ -62,4 +64,34 @@ public void run() {
throw asyncException.f0;
}
}
+
+   /**
+* Ensure that target path terminates with a new directory to be 
created by fs. If remoteURI does not specify a new
+* directory, append local directory name.
+* @param fs
+* @param localPath
+* @param remoteURI
+* @return
+* @throws IOException
+*/
+   protected static URI checkInitialDirectory(final FileSystem fs,final 
File localPath, final URI remoteURI) throws IOException {
--- End diff --

- Can you make this private or move the code to the `run` method?
- Can you add whitespace between the arguments, `FileSystem fs, File 
localPath, URI remoteURI`?
- Can you make the arguments non-final?
- Can you complete the Javadocs?

Why do we need this check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74743234
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
@@ -62,4 +64,34 @@ public void run() {
throw asyncException.f0;
}
}
+
+   /**
+* Ensure that target path terminates with a new directory to be 
created by fs. If remoteURI does not specify a new
+* directory, append local directory name.
+* @param fs
+* @param localPath
+* @param remoteURI
+* @return
+* @throws IOException
+*/
+   protected static URI checkInitialDirectory(final FileSystem fs,final 
File localPath, final URI remoteURI) throws IOException {
+   if (localPath.isDirectory()) {
+   Path remotePath = new Path(remoteURI);
+   if (fs.exists(remotePath)) {
+   return new 
Path(remotePath,localPath.getName()).toUri();
+   }
+   }
+   return remoteURI;
+   }
+
+   protected static void copyFromLocalFile(final FileSystem fs, final File 
localPath, final URI remotePath) throws Exception {
--- End diff --

I think we should move it to a utility an `flink-core` (this is where 
`FileSystem` is defined). I would add Javadocs explaining why we don't use 
`fs.copyFromLocalFile`. That way we don't have to copy the code in 
`yarn/Utils.java`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74743269
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
 ---
@@ -19,18 +19,18 @@
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
+import org.codehaus.jackson.map.ser.std.CollectionSerializer;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
+import java.io.*;
--- End diff --

We don't do star imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74743303
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
 ---
@@ -70,6 +70,85 @@ public void testCopyFromLocal() throws Exception {
}
}
 
+
+   /**
+* This test verifies that nested directories are properly copied.
+*/
+   @Test
+   public void testCopyFromLocalRecursive() throws Exception {
+
+   File rootDir = tempFolder.newFolder();
+   File nestedDir = new File(rootDir,"nested");
+   nestedDir.mkdir();
+
+   Map  copyFiles = new HashMap();
+
+   copyFiles.put("1",new File(rootDir, "1"));
+   copyFiles.put("2",new File(rootDir, "2"));
+   copyFiles.put("3",new File(nestedDir, "3"));
+
+   for (File file : copyFiles.values()) {
+   try (DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file))) {
+   out.writeUTF("Hello there, " + file.getName());
+   }
+   }
+   //add root and nested dirs to expected output
+   copyFiles.put(rootDir.getName(),rootDir);
+   copyFiles.put("nested",nestedDir);
+
+   assertEquals(5,copyFiles.size());
+
+   //Test for copy to unspecified target directory
+   File copyDirU = tempFolder.newFolder();
+   HDFSCopyFromLocal.copyFromLocal(
+   rootDir,
+   new Path("file://" + 
copyDirU.getAbsolutePath()).toUri());
+
+   //Test for copy to specified target directory
+   File copyDirQ = tempFolder.newFolder();
+   HDFSCopyFromLocal.copyFromLocal(
+   rootDir,
+   new Path("file://" + copyDirQ.getAbsolutePath() 
+ "/" + rootDir.getName()).toUri());
+
+   FilenameFilter noCrc = new FilenameFilter() {
--- End diff --

Why do we exclude this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74743404
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -255,4 +257,33 @@ private Utils() {
}
return result;
}
+
+   /**
+* Recursive directory copy to work around FileSystem implementations 
that do not implement it.
+* @param fs
+* @param localPath
+* @param remotePath
+* @throws IOException
+ */
+   protected static void copyFromLocalFile(final FileSystem fs, final Path 
localPath, final Path remotePath) throws IOException {
+   File localFile = new File(localPath.toUri());
+   if (localFile.isDirectory()) {
+   for (File file : localFile.listFiles()) {
+   copyFromLocalFile(fs, new Path("file://" + 
file.getAbsolutePath()), new Path(remotePath,file.getName()));
+   }
+   } else {
+   fs.copyFromLocalFile(localPath,remotePath);
+   }
+   }
+
+   public static Path checkScheme(final Path localRsrcPath) throws 
IOException {
--- End diff --

Can you make this private and remove the `final` modifier? What is the 
current behaviour if no scheme is specified?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2288
  
Thank you for contributing. All in all, this looks good and is an important 
fix. We would have to address the inline comments I've raised before merging it 
though. Do you have time to address them?

For the commits, I would squash all to a single commit via `git rebase -i` 
and add the issue tag and a description of what the problem is and how we solve 
it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2372: [FLINK-4281] [table] Wrap all Calcite Exceptions i...

2016-08-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2372#discussion_r74747473
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 ---
@@ -68,12 +68,16 @@ class FlinkPlannerImpl(
 }
   }
 
-  @throws(classOf[SqlParseException])
+  @throws(classOf[FlinkSqlParseException])
--- End diff --

Do you mean `FlinkSqlParseException` here?  
Yeah, you are right, we should remove it from throws. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4281) Wrap all Calcite Exceptions in Flink Exceptions

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420850#comment-15420850
 ] 

ASF GitHub Bot commented on FLINK-4281:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2372#discussion_r74747473
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 ---
@@ -68,12 +68,16 @@ class FlinkPlannerImpl(
 }
   }
 
-  @throws(classOf[SqlParseException])
+  @throws(classOf[FlinkSqlParseException])
--- End diff --

Do you mean `FlinkSqlParseException` here?  
Yeah, you are right, we should remove it from throws. 


> Wrap all Calcite Exceptions in Flink Exceptions
> ---
>
> Key: FLINK-4281
> URL: https://issues.apache.org/jira/browse/FLINK-4281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> Some exceptions are already wrapped in Flink exceptions but there are still 
> exceptions thrown by Calcite. I would propose that all Exceptions thrown by 
> the Table API are Flink's Exceptions, esp. the FlinkPlannerImpl exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4393) Failed to serialize accumulators for task

2016-08-15 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4393:
--
Description: 
Dear Team,
  I am getting the below exception while trying to use the Table API by looping 
through the DataSet using collect() method.

{code}
2016-08-15 07:18:52,503 WARN  
org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to 
serialize accumulators for task.
java.lang.OutOfMemoryError
at 
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
at 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
at 
org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
at 
org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248)
at 
org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446)
at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Suppressed: java.lang.OutOfMemoryError
at 
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at 
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at 
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:303)
... 28 more
2016-08-15 07:18:52,508 ERROR 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Failed to 
deserialize final accumulator results.
java.lang.NullPointerException
at 

[jira] [Commented] (FLINK-4393) Failed to serialize accumulators for task

2016-08-15 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420868#comment-15420868
 ] 

Robert Metzger commented on FLINK-4393:
---

What's the DataSet size you are trying to collect()?

The problem is that the amount of data you can transfer from the cluster to 
your client is limited by a) the amount of heap space at the client and the 
amount of data the RPC system can transfer. IIRC, the RPC system's limit is 
10MB with the default configuration.

> Failed to serialize accumulators for task
> -
>
> Key: FLINK-4393
> URL: https://issues.apache.org/jira/browse/FLINK-4393
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
> Environment: Redhat 6
>Reporter: Sajeev Ramakrishnan
>
> Dear Team,
>   I am getting the below exception while trying to use the Table API by 
> looping through the DataSet using collect() method.
> {code}
> 2016-08-15 07:18:52,503 WARN  
> org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to 
> serialize accumulators for task.
> java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Suppressed: java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at 
> java.io.ByteArrayOutputStr

[jira] [Created] (FLINK-4396) MetricRegistry class not found at startup of jobmanager

2016-08-15 Thread RWenden (JIRA)
RWenden created FLINK-4396:
--

 Summary: MetricRegistry class not found at startup of jobmanager
 Key: FLINK-4396
 URL: https://issues.apache.org/jira/browse/FLINK-4396
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.1.1
 Environment: Windows and Unix

Reporter: RWenden
 Fix For: 1.1.2


For Flink 1.1.1 we configured Graphite metrics settings on the flink-conf.yaml 
(for job manager (and taskmanager)).

We see the following error in the log:

2016-08-15 14:20:34,167 ERROR org.apache.flink.runtime.metrics.MetricRegistry   
- Could not instantiate metrics reportermy_reporter. Metrics might 
not be exposed/reported.
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.graphite.GraphiteReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:119)

We found out that this class is not packaged inside flink-dist_2.11-1.1.1.jar.
Long story short: we had to install/provide the following jars into the lib 
folder to make Graphite metrics to work:
flink-metrics-graphite-1.1.1.jar
flink-metrics-dropwizard-1.1.1.jar
metrics-graphite-3.1.0.jar (from dropwizard)

We think these libraries (and the ones for Ganglia,StatsD,...) should be 
included in flink-dist_2.11-1.1.1.jar, for these are needed at manager startup 
time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420931#comment-15420931
 ] 

ASF GitHub Bot commented on FLINK-4273:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r74756158
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
--- End diff --

Is there a reason not to use the default "akka.ask.timeout" here?


> Refactor JobClientActor to watch already submitted jobs 
> 
>
> Key: FLINK-4273
> URL: https://issues.apache.org/jira/browse/FLINK-4273
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for 
> the result. This process should be broken up into a submission process and a 
> waiting process which can both be entered independently. This leads to two 
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r74756158
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
--- End diff --

Is there a reason not to use the default "akka.ask.timeout" here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420945#comment-15420945
 ] 

ASF GitHub Bot commented on FLINK-4273:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r74757666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
+   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
+   } catch (Exception e) {
+   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
+   classloader = JobClient.class.getClassLoader();
+   }
+
+   // we create a proxy JobClientActor that deals with all 
communication with
+   // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
+   // update messages, watches for disconnect between client and 
JobManager, ...
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   leaderRetrievalService,
+   timeout,
+   sysoutLogUpdates);
+
+   ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+   Future attachmentFuture = Patterns.ask(
+   jobClientActor,
+   new JobClientMessages.AttachToJobAndWait(jobID),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobID,
+   attachmentFuture,
+   jobClientActor,
+   classloader);
+   }
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @param timeout timeout for querying the jobmanager
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   public static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config,
+   FiniteDuration timeout)
+   throws JobRetrievalException {
+
+   BlobCache blobClient = null;
+

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r74757666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
+   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
+   } catch (Exception e) {
+   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
+   classloader = JobClient.class.getClassLoader();
+   }
+
+   // we create a proxy JobClientActor that deals with all 
communication with
+   // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
+   // update messages, watches for disconnect between client and 
JobManager, ...
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   leaderRetrievalService,
+   timeout,
+   sysoutLogUpdates);
+
+   ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+   Future attachmentFuture = Patterns.ask(
+   jobClientActor,
+   new JobClientMessages.AttachToJobAndWait(jobID),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobID,
+   attachmentFuture,
+   jobClientActor,
+   classloader);
+   }
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @param timeout timeout for querying the jobmanager
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   public static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config,
+   FiniteDuration timeout)
+   throws JobRetrievalException {
+
+   BlobCache blobClient = null;
+   try {
+   final Object jmAnswer;
+   try {
+   jmAnswer = Await.result(
+   jobManager.ask(
+   new 
JobM

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r74757794
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java 
---
@@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) {
decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
}
}
+   else if (message instanceof AttachToJobAndWait) {
--- End diff --

This branch looks like sharing a lot of code with the branch above. I 
wonder if there's a way to generalize the code into a method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420946#comment-15420946
 ] 

ASF GitHub Bot commented on FLINK-4273:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r74757794
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java 
---
@@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) {
decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
}
}
+   else if (message instanceof AttachToJobAndWait) {
--- End diff --

This branch looks like sharing a lot of code with the branch above. I 
wonder if there's a way to generalize the code into a method?


> Refactor JobClientActor to watch already submitted jobs 
> 
>
> Key: FLINK-4273
> URL: https://issues.apache.org/jira/browse/FLINK-4273
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for 
> the result. This process should be broken up into a submission process and a 
> waiting process which can both be entered independently. This leads to two 
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2288: Feature/s3 a fix

2016-08-15 Thread cresny
Github user cresny commented on the issue:

https://github.com/apache/flink/pull/2288
  
I just returned from vacation so I have some catching up to do, but I'll
take care of this today or tomorrow.

On Aug 15, 2016 6:22 AM, "Ufuk Celebi"  wrote:

> Thank you for contributing. All in all, this looks good and is an
> important fix. We would have to address the inline comments I've raised
> before merging it though. Do you have time to address them?
>
> For the commits, I would squash all to a single commit via git rebase -i
> and add the issue tag and a description of what the problem is and how we
> solve it.
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

2016-08-15 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2313
  
I did a quick pass over the code. I think this change needs another review 
by our Actor expert @tillrohrmann ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420953#comment-15420953
 ] 

ASF GitHub Bot commented on FLINK-4273:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2313
  
I did a quick pass over the code. I think this change needs another review 
by our Actor expert @tillrohrmann ;)


> Refactor JobClientActor to watch already submitted jobs 
> 
>
> Key: FLINK-4273
> URL: https://issues.apache.org/jira/browse/FLINK-4273
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for 
> the result. This process should be broken up into a submission process and a 
> waiting process which can both be entered independently. This leads to two 
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420960#comment-15420960
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2314
  
Thanks a lot for the contribution @mushketyk.
It seems like the Flink community prefers to have put the majority of 
connector modules to Apache Bahir. We are currently in the process of figuring 
out the specifics.

I think its a good idea to have a ActiveMQ connector, because that also 
covers the JMS message brokers. Therefore, I'll review this PR now, but we'll 
probably merge it to bahir :)


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2314: [FLINK-3298] Implement ActiveMQ streaming connector

2016-08-15 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2314
  
Thanks a lot for the contribution @mushketyk.
It seems like the Flink community prefers to have put the majority of 
connector modules to Apache Bahir. We are currently in the process of figuring 
out the specifics.

I think its a good idea to have a ActiveMQ connector, because that also 
covers the JMS message brokers. Therefore, I'll review this PR now, but we'll 
probably merge it to bahir :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74760042
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
--- End diff --

Is there a particular reason for picking this version?
The version seems to be from 13-Feb-2015, the last release is from August 
2016.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2141: [FLINK-4021] Problem of setting autoread for netty...

2016-08-15 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2141#discussion_r74760105
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -292,7 +296,11 @@ else if (bufferListener.waitForBuffer(bufferProvider, 
bufferOrEvent)) {
return false;
}
else if (bufferProvider.isDestroyed()) {
-   return false;
--- End diff --

We usually have a white space between keywords like `if` or `else`:
```java
if (isStagedBuffer) {
return true;
} else {
return false;
}
```

In this case, you can simplify the return value to `return isStagedBuffer`. 
Same for the other place where you use this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74760157
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.activemq
+   activemq-client
+   ${activemq.version}
+   
+
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-tests_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test-jar
+   test
+   
+   
+   org.apache.activemq.tooling
+   activemq-junit
+   5.13.1
--- End diff --

Why are you using a different version for the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420965#comment-15420965
 ] 

ASF GitHub Bot commented on FLINK-4021:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2141#discussion_r74760105
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -292,7 +296,11 @@ else if (bufferListener.waitForBuffer(bufferProvider, 
bufferOrEvent)) {
return false;
}
else if (bufferProvider.isDestroyed()) {
-   return false;
--- End diff --

We usually have a white space between keywords like `if` or `else`:
```java
if (isStagedBuffer) {
return true;
} else {
return false;
}
```

In this case, you can simplify the return value to `return isStagedBuffer`. 
Same for the other place where you use this.


> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> -
>
> Key: FLINK-4021
> URL: https://issues.apache.org/jira/browse/FLINK-4021
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, the 
> decodeMsg method in PartitionRequestClientHandler will return false, that 
> means setting autoread as true will not be done anymore.
> In summary,  if one task "A" sets the autoread as false because of no 
> available segments, and resulting in some staging buffers. If another task 
> "B" is failed by accident corresponding to one staging buffer. When task A 
> trys to reset autoread as true, the process can not work because of task B 
> failed.
> I have fixed this problem in our application by adding one boolean parameter 
> in decodeBufferOrEvent method to distinguish whether this method is invoke by 
> netty IO thread channel read or staged message handler task in 
> PartitionRequestClientHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420966#comment-15420966
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74760157
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.activemq
+   activemq-client
+   ${activemq.version}
+   
+
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-tests_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test-jar
+   test
+   
+   
+   org.apache.activemq.tooling
+   activemq-junit
+   5.13.1
--- End diff --

Why are you using a different version for the tests?


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420964#comment-15420964
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74760042
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
--- End diff --

Is there a particular reason for picking this version?
The version seems to be from 13-Feb-2015, the last release is from August 
2016.


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2141: [FLINK-4021] Problem of setting autoread for netty channe...

2016-08-15 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2141
  
Very impressive that you made it through that part of the system. It's very 
poorly documented and overly complex. The change looks good and ensures that a 
staged buffer that cannot be decoded because of a closed buffer pool does not 
leave the channel with auto read set to false. This is currently not a problem 
as all tasks of the job are failed, but with partial recovery this will lead to 
problems if the channel is kept alive by other consuming tasks.

We could add the following as a test for this in 
`PartitionRequestClientHandlerTest`. What do you think? This covers both 
branches that you added.

```java
/**
 * Tests that an unsuccessful message decode call for a staged message
 * does not leave the channel with auto read set to false.
 */
@Test
@SuppressWarnings("unchecked")
public void testAutoReadAfterUnsuccessfulStagedMessage() throws Exception {
PartitionRequestClientHandler handler = new 
PartitionRequestClientHandler();
EmbeddedChannel channel = new EmbeddedChannel(handler);

final AtomicReference> listener = new 
AtomicReference<>();

BufferProvider bufferProvider = mock(BufferProvider.class);

when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new 
Answer() {
@Override
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
listener.set((EventListener) 
invocation.getArguments()[0]);
return true;
}
});

when(bufferProvider.requestBuffer()).thenReturn(null);

InputChannelID channelId = new InputChannelID(0, 0);
RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(channelId);

handler.addInputChannel(inputChannel);

BufferResponse msg = createBufferResponse(channelId, channel);

// Write 1st buffer msg. No buffer is available, therefore the buffer
// should be staged and auto read should be set to false.
assertTrue(channel.config().isAutoRead());
channel.writeInbound(msg);

// No buffer available, auto read false
assertFalse(channel.config().isAutoRead());

// Write more buffers... all staged.
msg = createBufferResponse(channelId, channel);
channel.writeInbound(msg);

msg = createBufferResponse(channelId, channel);
channel.writeInbound(msg);

// Notify about buffer => handle 1st msg
Buffer availableBuffer = createBuffer(false);
listener.get().onEvent(availableBuffer);

// Start processing of staged buffers (in run pending tasks). Make
// sure that the buffer provider acts like it's destroyed.

when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
when(bufferProvider.isDestroyed()).thenReturn(true);

// The 3rd staged msg has a null buffer provider
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider, 
bufferProvider, null);

// Execute all tasks that are scheduled in the event loop. Further
// eventLoop().execute() calls are directly executed, if they are
// called in the scope of this call.
channel.runPendingTasks();

assertTrue(channel.config().isAutoRead());
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420973#comment-15420973
 ] 

ASF GitHub Bot commented on FLINK-4021:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2141
  
Very impressive that you made it through that part of the system. It's very 
poorly documented and overly complex. The change looks good and ensures that a 
staged buffer that cannot be decoded because of a closed buffer pool does not 
leave the channel with auto read set to false. This is currently not a problem 
as all tasks of the job are failed, but with partial recovery this will lead to 
problems if the channel is kept alive by other consuming tasks.

We could add the following as a test for this in 
`PartitionRequestClientHandlerTest`. What do you think? This covers both 
branches that you added.

```java
/**
 * Tests that an unsuccessful message decode call for a staged message
 * does not leave the channel with auto read set to false.
 */
@Test
@SuppressWarnings("unchecked")
public void testAutoReadAfterUnsuccessfulStagedMessage() throws Exception {
PartitionRequestClientHandler handler = new 
PartitionRequestClientHandler();
EmbeddedChannel channel = new EmbeddedChannel(handler);

final AtomicReference> listener = new 
AtomicReference<>();

BufferProvider bufferProvider = mock(BufferProvider.class);

when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new 
Answer() {
@Override
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
listener.set((EventListener) 
invocation.getArguments()[0]);
return true;
}
});

when(bufferProvider.requestBuffer()).thenReturn(null);

InputChannelID channelId = new InputChannelID(0, 0);
RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(channelId);

handler.addInputChannel(inputChannel);

BufferResponse msg = createBufferResponse(channelId, channel);

// Write 1st buffer msg. No buffer is available, therefore the buffer
// should be staged and auto read should be set to false.
assertTrue(channel.config().isAutoRead());
channel.writeInbound(msg);

// No buffer available, auto read false
assertFalse(channel.config().isAutoRead());

// Write more buffers... all staged.
msg = createBufferResponse(channelId, channel);
channel.writeInbound(msg);

msg = createBufferResponse(channelId, channel);
channel.writeInbound(msg);

// Notify about buffer => handle 1st msg
Buffer availableBuffer = createBuffer(false);
listener.get().onEvent(availableBuffer);

// Start processing of staged buffers (in run pending tasks). Make
// sure that the buffer provider acts like it's destroyed.

when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
when(bufferProvider.isDestroyed()).thenReturn(true);

// The 3rd staged msg has a null buffer provider
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider, 
bufferProvider, null);

// Execute all tasks that are scheduled in the event loop. Further
// eventLoop().execute() calls are directly executed, if they are
// called in the scope of this call.
channel.runPendingTasks();

assertTrue(channel.config().isAutoRead());
}
```


> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> -
>
> Key: FLINK-4021
> URL: https://issues.apache.org/jira/browse/FLINK-4021
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, t

[GitHub] flink issue #2288: Feature/s3 a fix

2016-08-15 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2288
  
Thank you! :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420981#comment-15420981
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762137
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

What if somebody wants to send data to a topic, instead of a queue?


> Streaming connector for ActiveMQ
> 
>
> 

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762137
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

What if somebody wants to send data to a topic, instead of a queue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420982#comment-15420982
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762300
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
--- End diff --

I assume this means that the sink doesn't provide any processing 
guarantees. Maybe it would make sense to document the guarantees in the 
javadocs.


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: htt

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420983#comment-15420983
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762401
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

Do we have to create a queue / topic or can we also produce into an 
existing q / t ?


> Streaming connector for ActiveMQ
> ---

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762401
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

Do we have to create a queue / topic or can we also produce into an 
existing q / t ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with IN

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762673
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
+
+   // Create a MessageProducer from the Session to the Topic or
+   // Queue
+   producer = session.createProducer(destination);
+   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
--- End diff --

I think we should make this configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. I

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420988#comment-15420988
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762673
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
+
+   // Create a MessageProducer from the Session to the Topic or
+   // Queue
+   producer = session.createProducer(destination);
 

[GitHub] flink issue #2075: [FLINK-3867] Provide Vagrant/Ansible based VMs to easily ...

2016-08-15 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2075
  
I agree. +1 to link in 3rd party packages if this is up to date and close 
this PR. What do you think @rmetzger and @jneuff?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3867) Provide virtualized Flink architecture for testing purposes

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420980#comment-15420980
 ] 

ASF GitHub Bot commented on FLINK-3867:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2075
  
I agree. +1 to link in 3rd party packages if this is up to date and close 
this PR. What do you think @rmetzger and @jneuff?


> Provide virtualized Flink architecture for testing purposes
> ---
>
> Key: FLINK-3867
> URL: https://issues.apache.org/jira/browse/FLINK-3867
> Project: Flink
>  Issue Type: Test
>  Components: flink-contrib
>Reporter: Andreas Kempa-Liehr
>
> For developers interested in Apache Flink it would be very helpful to deploy 
> an Apache Flink cluster on a set of virtualized machines, in order to get 
> used to the configuration of the system and the development of basic 
> algorithms.
> This kind of setup could also be used for testing purposes.
> An example implementation on basis of Ansible and Vagrant has been published 
> unter https://github.com/kempa-liehr/flinkVM/tree/master/flink-vm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4055) EventTimeWindowCheckpointingITCase.testPreAggregatedSlidingTimeWindow fails on Travis

2016-08-15 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421000#comment-15421000
 ] 

Till Rohrmann commented on FLINK-4055:
--

Another instance: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/152349425/log.txt

> EventTimeWindowCheckpointingITCase.testPreAggregatedSlidingTimeWindow fails 
> on Travis
> -
>
> Key: FLINK-4055
> URL: https://issues.apache.org/jira/browse/FLINK-4055
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, Windowing Operators
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The test case 
> {{EventTimeWindowCheckpointingITCase.testPreAggregatedSlidingTimeWindow}} 
> fails spuriously on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/136395806/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74762300
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
--- End diff --

I assume this means that the sink doesn't provide any processing 
guarantees. Maybe it would make sense to document the guarantees in the 
javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4383) Check parameters for serializability before sending a remote RpcInvocation message

2016-08-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4383.

Resolution: Fixed

Fixed via e8a434647bac8009b11779a3943c5d63871fcb14

> Check parameters for serializability before sending a remote RpcInvocation 
> message
> --
>
> Key: FLINK-4383
> URL: https://issues.apache.org/jira/browse/FLINK-4383
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Before sending a remote {{RpcInvocation}} message we should check that the 
> rpc arguments are serializable. If not we should eagerly fail with an 
> appropriate exception message.
> If we don't do this, then Akka will silently fail serializing the message 
> without telling the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74766399
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema, RunningChecker 
runningChecker) {
+   super(String.class);
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.deserializationSchema = deserializationSchema;

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421004#comment-15421004
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74766399
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deseriali

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421009#comment-15421009
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74766651
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deseriali

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74766651
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema, RunningChecker 
runningChecker) {
+   super(String.class);
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.deserializationSchema = deserializationSchema;

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74766852
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
--- End diff --

Why is a simple boolean field for this not sufficient? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421011#comment-15421011
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r74766852
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
--- End diff --

Why is a simple boolean field for this not sufficient? 


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2314: [FLINK-3298] Implement ActiveMQ streaming connector

2016-08-15 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2314
  
Thank you for this contribution. I did an initial check of the code.
Once all my comments are addressed, I'll do another check.

Did you test the code on a cluster? Were there any any dependency issues / 
conflicts with the user job jar?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2265: [FLINK-3097] [table] Add support for custom functions in ...

2016-08-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2265
  
Merging...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3097) Add support for custom functions in Table API

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421014#comment-15421014
 ] 

ASF GitHub Bot commented on FLINK-3097:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2265
  
Merging...



> Add support for custom functions in Table API
> -
>
> Key: FLINK-3097
> URL: https://issues.apache.org/jira/browse/FLINK-3097
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently, the Table API has a very limited set of built-in functions. 
> Support for custom functions can solve this problem. Adding of a custom row 
> function could look like:
> {code}
> TableEnvironment tableEnv = new TableEnvironment();
> RowFunction rf = new RowFunction() {
> @Override
> public String call(Object[] args) {
> return ((String) args[0]).trim();
> }
> };
> tableEnv.getConfig().registerRowFunction("TRIM", rf,
> BasicTypeInfo.STRING_TYPE_INFO);
> DataSource> input = env.fromElements(
> new Tuple1<>(" 1 "));
> Table table = tableEnv.fromDataSet(input);
> Table result = table.select("TRIM(f0)");
> {code}
> This feature is also necessary as part of FLINK-2099.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421015#comment-15421015
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2314
  
Thank you for this contribution. I did an initial check of the code.
Once all my comments are addressed, I'll do another check.

Did you test the code on a cluster? Were there any any dependency issues / 
conflicts with the user job jar?


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421021#comment-15421021
 ] 

ASF GitHub Bot commented on FLINK-4394:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2373
  
Merging ...


> RMQSource: The QueueName is not accessible to subclasses
> 
>
> Key: FLINK-4394
> URL: https://issues.apache.org/jira/browse/FLINK-4394
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.1
>Reporter: Dominik Bruhn
>Assignee: Dominik Bruhn
>
> In version 1.1.0 we made the RMQSource extensible so that subclasses can 
> configure how they want their queue for RabbitMQ/AMQP create. The subclasses 
> can override 
> {code}
>   protected void setupQueue() throws IOException {
>   channel.queueDeclare(queueName, true, false, false, null);
>   }
> {code}
> The problem is, that the queueName property is private. So when override the 
> setupQueue parameter, you don't know what actual queueName was provided. A 
> simple change of the queueName property to protected fixes this.
> PR will follow



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2373: [FLINK-4394] RMQSource: QueueName accessible for subclass...

2016-08-15 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2373
  
Merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4397) Unstable test SlotCountExceedingParallelismTest.tearDown

2016-08-15 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-4397:
--
Assignee: Ufuk Celebi

> Unstable test SlotCountExceedingParallelismTest.tearDown
> 
>
> Key: FLINK-4397
> URL: https://issues.apache.org/jira/browse/FLINK-4397
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Ufuk Celebi
>
> An instance can be found here:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/152392524/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4398) Unstable test KvStateServerHandlerTest.testSimpleQuery

2016-08-15 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-4398:
-

 Summary: Unstable test KvStateServerHandlerTest.testSimpleQuery
 Key: FLINK-4398
 URL: https://issues.apache.org/jira/browse/FLINK-4398
 Project: Flink
  Issue Type: Bug
Reporter: Kostas Kloudas


An instance can be found here:

https://s3.amazonaws.com/archive.travis-ci.org/jobs/152392521/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2265: [FLINK-3097] [table] Add support for custom functi...

2016-08-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2265


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74768392
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
--- End diff --

Java docs are missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses

2016-08-15 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4394.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Thank you for the contribution.
Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/5ccd9071

> RMQSource: The QueueName is not accessible to subclasses
> 
>
> Key: FLINK-4394
> URL: https://issues.apache.org/jira/browse/FLINK-4394
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.1
>Reporter: Dominik Bruhn
>Assignee: Dominik Bruhn
> Fix For: 1.2.0
>
>
> In version 1.1.0 we made the RMQSource extensible so that subclasses can 
> configure how they want their queue for RabbitMQ/AMQP create. The subclasses 
> can override 
> {code}
>   protected void setupQueue() throws IOException {
>   channel.queueDeclare(queueName, true, false, false, null);
>   }
> {code}
> The problem is, that the queueName property is private. So when override the 
> setupQueue parameter, you don't know what actual queueName was provided. A 
> simple change of the queueName property to protected fixes this.
> PR will follow



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4398) Unstable test KvStateServerHandlerTest.testSimpleQuery

2016-08-15 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-4398:
--
Labels: test-stability  (was: )

> Unstable test KvStateServerHandlerTest.testSimpleQuery
> --
>
> Key: FLINK-4398
> URL: https://issues.apache.org/jira/browse/FLINK-4398
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Ufuk Celebi
>  Labels: test-stability
>
> An instance can be found here:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/152392521/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3097) Add support for custom functions in Table API

2016-08-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-3097.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 90fdae452b6a03fafd4ec7827030d78ae87dbcd3.

> Add support for custom functions in Table API
> -
>
> Key: FLINK-3097
> URL: https://issues.apache.org/jira/browse/FLINK-3097
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> Currently, the Table API has a very limited set of built-in functions. 
> Support for custom functions can solve this problem. Adding of a custom row 
> function could look like:
> {code}
> TableEnvironment tableEnv = new TableEnvironment();
> RowFunction rf = new RowFunction() {
> @Override
> public String call(Object[] args) {
> return ((String) args[0]).trim();
> }
> };
> tableEnv.getConfig().registerRowFunction("TRIM", rf,
> BasicTypeInfo.STRING_TYPE_INFO);
> DataSource> input = env.fromElements(
> new Tuple1<>(" 1 "));
> Table table = tableEnv.fromDataSet(input);
> Table result = table.select("TRIM(f0)");
> {code}
> This feature is also necessary as part of FLINK-2099.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4397) Unstable test SlotCountExceedingParallelismTest.tearDown

2016-08-15 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-4397:
--
Labels: test-stability  (was: )

> Unstable test SlotCountExceedingParallelismTest.tearDown
> 
>
> Key: FLINK-4397
> URL: https://issues.apache.org/jira/browse/FLINK-4397
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Ufuk Celebi
>  Labels: test-stability
>
> An instance can be found here:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/152392524/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4373) Introduce SlotID, AllocationID, ResourceProfile

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421031#comment-15421031
 ] 

ASF GitHub Bot commented on FLINK-4373:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74768392
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
--- End diff --

Java docs are missing


> Introduce SlotID, AllocationID, ResourceProfile
> ---
>
> Key: FLINK-4373
> URL: https://issues.apache.org/jira/browse/FLINK-4373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For the new version of cluster management, we need some more basic data 
> structures:
> * SlotID: Identifier of one single slot located in a task executor
> * AllocationID: Slot allocation identifier, created by the JobManager when 
> requesting a slot, constant across re-tries. Used to identify responses by 
> the ResourceManager and to identify deployment calls towards the TaskManager 
> that was allocated from.
> * ResourceProfile: The resource profile of the desired slot (currently only 
> cpu cores and memory are supported



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...

2016-08-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2094
  
I will shepherd this PR. I think I can review it this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74768700
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+   private static final long serialVersionUID = -784900073893060124L;
+
+   /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
+   private final double cpuCores;
+
+   /** How many memory in mb are needed */
+   private final long memoryInMB;
--- End diff --

It might be helpful to have getter methods for the fields. The RM can then 
use the information from the ResourceProfile to create container requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421039#comment-15421039
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2094
  
I will shepherd this PR. I think I can review it this week.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...

2016-08-15 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4373) Introduce SlotID, AllocationID, ResourceProfile

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421040#comment-15421040
 ] 

ASF GitHub Bot commented on FLINK-4373:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74768700
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+   private static final long serialVersionUID = -784900073893060124L;
+
+   /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
+   private final double cpuCores;
+
+   /** How many memory in mb are needed */
+   private final long memoryInMB;
--- End diff --

It might be helpful to have getter methods for the fields. The RM can then 
use the information from the ResourceProfile to create container requests.


> Introduce SlotID, AllocationID, ResourceProfile
> ---
>
> Key: FLINK-4373
> URL: https://issues.apache.org/jira/browse/FLINK-4373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For the new version of cluster management, we need some more basic data 
> structures:
> * SlotID: Identifier of one single slot located in a task executor
> * AllocationID: Slot allocation identifier, created by the JobManager when 
> requesting a slot, constant across re-tries. Used to identify responses by 
> the ResourceManager and to identify deployment calls towards the TaskManager 
> that was allocated from.
> * ResourceProfile: The resource profile of the desired slot (currently only 
> cpu cores and memory are supported



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421042#comment-15421042
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks!


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >