[jira] [Created] (FLINK-4691) Add group-windows for streaming tables

2016-09-27 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4691:
---

 Summary: Add group-windows for streaming tables
 Key: FLINK-4691
 URL: https://issues.apache.org/jira/browse/FLINK-4691
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


Add Tumble, Slide, Session group-windows for streaming tables as described in 
[FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
 

Implementation of group-windows on streaming tables. This includes implementing 
the API of group-windows, the logical validation for group-windows, and the 
definition of the “rowtime” and “systemtime” keywords. Group-windows on batch 
tables won’t be initially supported and will throw an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4692) Add tumbling and sliding group-windows for batch tables

2016-09-27 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4692:
---

 Summary: Add tumbling and sliding group-windows for batch tables
 Key: FLINK-4692
 URL: https://issues.apache.org/jira/browse/FLINK-4692
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Timo Walther


Add Tumble and Slide group-windows for batch tables as described in 
[FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4693) Add session group-windows for batch tables

2016-09-27 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4693:
---

 Summary: Add session group-windows for batch tables
 Key: FLINK-4693
 URL: https://issues.apache.org/jira/browse/FLINK-4693
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Timo Walther


Add Session group-windows for batch tables as described in 
[FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Regarding `flink-streaming` project dependencies

2016-09-27 Thread Liwei Lin
Hi folks,

There are comments like this in
`StreamExecutionEnvironment.getExecutionEnvironment()`:

// because the streaming project depends on "flink-clients" (and not the
> other way around)
> // we currently need to intercept the data set environment and create a
> dependent stream env.
> // this should be fixed once we rework the project dependencies


https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1592-L1594

I wonder if there is any plan to rework the project dependencies? In the
long run, are we planning to have `flink-clients` depend on
`flink-streaming` or to have `flink-java` depend on `flink-clients`?

Thanks!

Liwei


Re: Regarding `flink-streaming` project dependencies

2016-09-27 Thread Stephan Ewen
Hi!

Yes, there are definitely plans and desires to do that, definitely. May be
breaking some API / dependency structure, so probably a candidate for Flink
2.0

Greetings,
Stephan


On Tue, Sep 27, 2016 at 10:45 AM, Liwei Lin  wrote:

> Hi folks,
>
> There are comments like this in
> `StreamExecutionEnvironment.getExecutionEnvironment()`:
>
> // because the streaming project depends on "flink-clients" (and not the
> > other way around)
> > // we currently need to intercept the data set environment and create a
> > dependent stream env.
> > // this should be fixed once we rework the project dependencies
>
>
> https://github.com/apache/flink/blob/master/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/environment/
> StreamExecutionEnvironment.java#L1592-L1594
>
> I wonder if there is any plan to rework the project dependencies? In the
> long run, are we planning to have `flink-clients` depend on
> `flink-streaming` or to have `flink-java` depend on `flink-clients`?
>
> Thanks!
>
> Liwei
>


[jira] [Created] (FLINK-4694) Add wait for termination function to RpcEndpoints

2016-09-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4694:


 Summary: Add wait for termination function to RpcEndpoints
 Key: FLINK-4694
 URL: https://issues.apache.org/jira/browse/FLINK-4694
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


In order to run a {{RpcEndpoint}} it would be useful to have a function which 
allows to wait for the termination of the {{RpcEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] add netty tcp/restful pushed source support

2016-09-27 Thread Stephan Ewen
I think that could be an interesting source. Two quick questions to move
forward

  - To keep the Flink code base from becoming too big (hard to maintain and
test) we started working with Apache Bahir as a project dedicated to
streaming connectors. Would that be a good target for the connector?

  - What are your thoughts on fault tolerance for that connector?

On Mon, Sep 26, 2016 at 3:01 PM, shijinkui  wrote:

> Hi, all
>
> 1.In order to support end-to-end pushed source, I create FLINK-4630<
> https://issues.apache.org/jira/browse/FLINK-4630>. I want to know whether
> is this idea worth?
>
> ---
> When source stream get start, listen a provided tcp port, receive stream
> data from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from
> business system to flink worker directly.
>
> user app push ->  netty server source of Flink
>
> describe the source in detail below:
>
> 1.source run as a netty tcp server
> 2.user provide a tcp port, if the port is in used, increace the port
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to
> flink
>
>
> Thanks
>
> Jinkui Shi
>
>


Re: No support for request PutMappingRequest

2016-09-27 Thread Aljoscha Krettek
Hi,
the mapping should not be updated in the Flink sink. According to the
documentation the mapping is a setting on an index that should not be
changed after an index was created and some documents were added to that
index:
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html

I think you have to set your mappings/settings on the index before you
start your Flink job. The Flink sink is only meant for adding elements to
an index.

Cheers,
Aljoscha

On Mon, 26 Sep 2016 at 15:39 Ozan DENİZ  wrote:

> Hi Aljoscha,
>
>
> We are trying to add sortable feature for elasticsearch. To do this, we
> need to add mapping to index.
>
>
> We try to sort some fields in elasticsearch. To make it our json format
> should like this;
>
>
> "tweet": {
> "type": "string",
> "analyzer": "english",
> "fields": {
> "raw": {
> "type": "string",
> "index": "not_analyzed"
> }
> }
> }
>
>
>
> We should add "not_analyzed" to map.
>
>
> private XContentBuilder buildMapping( String typeName )
> {
>
> XContentBuilder mapping = null;
> try
> {
> mapping = jsonBuilder()
> .startObject()
> .startObject(typeName)
> .startObject("properties")
>
>
> .startObject(LogFields.MESSAGE)
> .field("type","string")
> .field("analyzer", "standard")
> .startObject("fields")
> .startObject("raw")
> .field("type","string")
> .field("index", "not_analyzed")
> .endObject()
> .endObject()
> .endObject()
>
> .endObject()
> .endObject()
> .endObject();
> }
> catch ( IOException e )
> {
> e.printStackTrace();
> }
>
> return mapping;
> }
>
>
>
>
> 
> Gönderen: Aljoscha Krettek 
> Gönderildi: 26 Eylül 2016 Pazartesi 16:27:12
> Kime: dev@flink.apache.org
> Konu: Re: No support for request PutMappingRequest
>
> Hi,
> I think PutMappingRequest is a request that can only be sent using
> IndicesAdminClient. In my understanding this is an administrative command
> that isn't related to actually storing data in an index.
>
> What are you trying to store with the PutMappingRequest?
>
> Cheers,
> Aljoscha
>
> On Mon, 26 Sep 2016 at 15:16 Ozan DENİZ  wrote:
>
> > Hi,
> >
> >
> > I am sending the error message below;
> >
> >
> > java.lang.IllegalArgumentException: No support for request
> >
> [org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest@3ed05d8b
> > ]
> > at org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:107)
> > ~[elasticsearch-2.3.5.jar:2.3.5]
> > at
> >
> org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:284)
> > ~[elasticsearch-2.3.5.jar:2.3.5]
> > at
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:268)
> > ~[elasticsearch-2.3.5.jar:2.3.5]
> > at
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)
> > ~[elasticsearch-2.3.5.jar:2.3.5]
> > at
> >
> org.apache.flink.streaming.connectors.elasticsearch2.BulkProcessorIndexer.add(BulkProcessorIndexer.java:32)
> >
> ~[flink-connector-elasticsearch2_2.10-1.2-20160926.041955-45.jar:1.2-SNAPSHOT]
> > at sink.ESSink.process(ESSink.java:77) ~[classes/:?]
> > at sink.ESSink.process(ESSink.java:25) ~[classes/:?]
> > at
> >
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.invoke(ElasticsearchSink.java:232)
> >
> ~[flink-connector-elasticsearch2_2.10-1.2-20160926.041955-45.jar:1.2-SNAPSHOT]
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
> > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1]
> > at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:176)
> > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1]
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
> > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1]
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
> > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1]
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > [flink-runtime_2.10-1.1.1.jar:1.1.1]
> > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
> > 09/26/2016 16:14:05 Sink: Unnamed(1/1) switched to FAILED
> >
> >
> > We try to add mapping request to elastic search. We cannot access to
> > client attribute (it is private) in elasticsearch class.
> >
> >
> > Is there any way to overcome this problem.
> >
> >
> > Thanks,
> >
> >
> > Ozan
> >
> >
> >
> > 
> > Gönderen: Till Rohrmann 
> > Gönderildi: 26 Eylül 2016 Pazartesi 13:30:34
> > Kime: dev@flink.apache.org
> > Bilgi: Aljoscha Krettek
> > Konu: Re: No su

Re: Regarding `flink-streaming` project dependencies

2016-09-27 Thread Liwei Lin
Thanks Stephan for the prompt response!

Glad to know it's targeted for Flink 2.0. Is there any JIRA tracking this?
I couldn't find such one, :-)

Thanks!

Liwei


On Tue, Sep 27, 2016 at 4:47 PM, Stephan Ewen  wrote:

> Hi!
>
> Yes, there are definitely plans and desires to do that, definitely. May be
> breaking some API / dependency structure, so probably a candidate for Flink
> 2.0
>
> Greetings,
> Stephan
>
>
> On Tue, Sep 27, 2016 at 10:45 AM, Liwei Lin  wrote:
>
> > Hi folks,
> >
> > There are comments like this in
> > `StreamExecutionEnvironment.getExecutionEnvironment()`:
> >
> > // because the streaming project depends on "flink-clients" (and not the
> > > other way around)
> > > // we currently need to intercept the data set environment and create a
> > > dependent stream env.
> > > // this should be fixed once we rework the project dependencies
> >
> >
> > https://github.com/apache/flink/blob/master/flink-
> > streaming-java/src/main/java/org/apache/flink/streaming/api/environment/
> > StreamExecutionEnvironment.java#L1592-L1594
> >
> > I wonder if there is any plan to rework the project dependencies? In the
> > long run, are we planning to have `flink-clients` depend on
> > `flink-streaming` or to have `flink-java` depend on `flink-clients`?
> >
> > Thanks!
> >
> > Liwei
> >
>


[DISCUSS] Timely User Functions and Watermarks

2016-09-27 Thread Aljoscha Krettek
Hi Folks,
I'm in the process of implementing
https://issues.apache.org/jira/browse/FLINK-3674 and now
I'm having a bit of a problem with deciding how watermarks should be
treated for operators that have more than one input.

The problem is deciding when to fire event-time timers. For one-input
operators it's pretty straightforward: fire once the watermark surpasses a
given timer. For two-input operators we allow the operator implementer to
observe the watermarks from the two inputs separately and react to that and
also to decide what watermark to forward. With this it becomes hard to
figure out when to fire timers.

My thinking is that we should not allow operators to observe the watermark
anymore but route it past the operator and deal with watermarks and timers
outside of the operator. A timer for operators with more than one inputs
(TwoInputOperator) would fire if the watermark from both inputs advances
sufficiently far.

Alternatively, we could still let operators observe watermarks but grab the
watermark before it enters the operator and still deal with timers in the
same way as proposed above.

Any feedback on this is very welcome! What would you expect to happen for
timers of operators with more than one input?

Cheers,
Aljoscha

P.S. One reason for why I want to deal with watermark outside of operators
is that otherwise every operator has to implement the functionality to
update the current watermark at the timer service. i.e. something like this:

@Internal
public class StreamMap
extends AbstractUdfStreamOperator>
implements OneInputStreamOperator {

private static final long serialVersionUID = 1L;

public StreamMap(MapFunction mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void processElement(StreamRecord element) throws Exception {

output.collect(element.replace(userFunction.map(element.getValue(;
}

@Override
public void processWatermark(Watermark mark) throws Exception {
timerService.updateWatermark(mark); // *<--- that's the thing I
don't want*
output.emitWatermark(mark);
}
}

This becomes more complicated for two input operators which also do the
merging of the watermarks from the two inputs right now.


RE: [DISCUSS] Timely User Functions and Watermarks

2016-09-27 Thread Radu Tudoran
Hi Aljoscha,

My 2 cents on this would be that it is worth maintaining the access to the 
watermarks. I think having the option to customize this is a strong point of 
Flink.

Regarding the solution you proposed based on 2 input timers " would fire if the 
watermark from both inputs advances sufficiently far." I would propose to have 
the option to set a strategy for the timer. We could have:
- EgerTrigger - when the triggering is fired when any of the inputs watermarks 
has advanced sufficiently far
- LeftEgerTrigger - when the triggering is fired when any of left input 
watermarks has advanced sufficiently far
- RightEgerTrigger - when the triggering is fired when any of right input 
watermarks has advanced sufficiently far
- SyncTrigger - when the triggering is fired if the watermark from both inputs 
advances sufficiently far


We could potentially include here the custom handling of the watermarks under a 
CustomTrigger strategy implemented as an operator that can be provided.


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, September 27, 2016 11:28 AM
To: Dev
Subject: [DISCUSS] Timely User Functions and Watermarks

Hi Folks,
I'm in the process of implementing
https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a 
problem with deciding how watermarks should be treated for operators that have 
more than one input.

The problem is deciding when to fire event-time timers. For one-input operators 
it's pretty straightforward: fire once the watermark surpasses a given timer. 
For two-input operators we allow the operator implementer to observe the 
watermarks from the two inputs separately and react to that and also to decide 
what watermark to forward. With this it becomes hard to figure out when to fire 
timers.

My thinking is that we should not allow operators to observe the watermark 
anymore but route it past the operator and deal with watermarks and timers 
outside of the operator. A timer for operators with more than one inputs
(TwoInputOperator) would fire if the watermark from both inputs advances 
sufficiently far.

Alternatively, we could still let operators observe watermarks but grab the 
watermark before it enters the operator and still deal with timers in the same 
way as proposed above.

Any feedback on this is very welcome! What would you expect to happen for 
timers of operators with more than one input?

Cheers,
Aljoscha

P.S. One reason for why I want to deal with watermark outside of operators is 
that otherwise every operator has to implement the functionality to update the 
current watermark at the timer service. i.e. something like this:

@Internal
public class StreamMap
extends AbstractUdfStreamOperator>
implements OneInputStreamOperator {

private static final long serialVersionUID = 1L;

public StreamMap(MapFunction mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void processElement(StreamRecord element) throws Exception {

output.collect(element.replace(userFunction.map(element.getValue(;
}

@Override
public void processWatermark(Watermark mark) throws Exception {
timerService.updateWatermark(mark); // *<--- that's the thing I don't 
want*
output.emitWatermark(mark);
}
}

This becomes more complicated for two input operators which also do the merging 
of the watermarks from the two inputs right now.


JDBC connection in Flink using Scala

2016-09-27 Thread sunny patel
Hi Team,

I am wondering is that possible to add JDBC connection as a source or
target in Flink using Scala.
Could you kindly some one help on this? DB write/sink code is not working.
if you have any sample code please share it here.


*Thanks*
*Sunny*


[image: Inline image 1]


答复: [DISCUSS] add netty tcp/restful pushed source support

2016-09-27 Thread shijinkui
Hey, Stephan Ewen

1.  bahir's target is spark. The contributer are rxin, srowen, tdas, mateiz 
and so on.
If we want bahir used by flink, we can suggest bahir provide streaming 
connecter interface, such as store(), start(), stop(), restart(), 
receiving(Any)...
Then same streaming connector can be implemented by spark and flink. 
But I think this is impossible, as bahir depend spark-streaming and spark sql.
2.  About connector fault tolerance. Bahir's mqtt and akka connector are 
themselves' storage. But netty have no persist data feature. 
I think we can append data to a ringbuffer. When SourceContext 
collect() throw error, then write message to inform client to stop send 
message. When flink SourceContext is normal, then write the ringbuffer data to 
flink, inform client to go on.
Because pushing mode is hard to control the flow throughput, the 
upstream client can.
This netty connector's purpose is end-to-end streaming, minimum time 
delay.
3.  Later on, We can provide http protocol ability, put, post, get, even 
websocket or jersey restful based on netty.

-邮件原件-
发件人: Stephan Ewen [mailto:se...@apache.org] 
发送时间: 2016年9月27日 16:54
收件人: dev@flink.apache.org
主题: Re: [DISCUSS] add netty tcp/restful pushed source support

I think that could be an interesting source. Two quick questions to move forward

  - To keep the Flink code base from becoming too big (hard to maintain and
test) we started working with Apache Bahir as a project dedicated to streaming 
connectors. Would that be a good target for the connector?

  - What are your thoughts on fault tolerance for that connector?

On Mon, Sep 26, 2016 at 3:01 PM, shijinkui  wrote:

> Hi, all
>
> 1.In order to support end-to-end pushed source, I create FLINK-4630<
> https://issues.apache.org/jira/browse/FLINK-4630>. I want to know 
> whether is this idea worth?
>
> ---
> When source stream get start, listen a provided tcp port, receive 
> stream data from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from 
> business system to flink worker directly.
>
> user app push ->  netty server source of Flink
>
> describe the source in detail below:
>
> 1.source run as a netty tcp server
> 2.user provide a tcp port, if the port is in used, increace the port
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to
> flink
>
>
> Thanks
>
> Jinkui Shi
>
>


[jira] [Created] (FLINK-4695) Separate configuration parsing from MetricRegistry

2016-09-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4695:


 Summary: Separate configuration parsing from MetricRegistry
 Key: FLINK-4695
 URL: https://issues.apache.org/jira/browse/FLINK-4695
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


In order to decouple the {{MetricRegistry}} object instantiation from the 
global configuration, we could introduce a {{MetricRegistryConfiguration}} 
object which encapsulates all necessary information for the {{MetricRegistry}}. 
The {{MetricRegistryConfiguration}} could have a static method to be generated 
from a {{Configuration}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: 答复: [DISCUSS] add netty tcp/restful pushed source support

2016-09-27 Thread Greg Hogan
Apache Bahir's website only suggests support for additional frameworks, but
there is a Flink repository at
  https://github.com/apache/bahir-flink

On Tue, Sep 27, 2016 at 8:38 AM, shijinkui  wrote:

> Hey, Stephan Ewen
>
> 1.  bahir's target is spark. The contributer are rxin, srowen, tdas,
> mateiz and so on.
> If we want bahir used by flink, we can suggest bahir provide
> streaming connecter interface, such as store(), start(), stop(), restart(),
> receiving(Any)...
> Then same streaming connector can be implemented by spark and
> flink. But I think this is impossible, as bahir depend spark-streaming and
> spark sql.
> 2.  About connector fault tolerance. Bahir's mqtt and akka connector
> are themselves' storage. But netty have no persist data feature.
> I think we can append data to a ringbuffer. When SourceContext
> collect() throw error, then write message to inform client to stop send
> message. When flink SourceContext is normal, then write the ringbuffer data
> to flink, inform client to go on.
> Because pushing mode is hard to control the flow throughput, the
> upstream client can.
> This netty connector's purpose is end-to-end streaming, minimum
> time delay.
> 3.  Later on, We can provide http protocol ability, put, post, get,
> even websocket or jersey restful based on netty.
>
> -邮件原件-
> 发件人: Stephan Ewen [mailto:se...@apache.org]
> 发送时间: 2016年9月27日 16:54
> 收件人: dev@flink.apache.org
> 主题: Re: [DISCUSS] add netty tcp/restful pushed source support
>
> I think that could be an interesting source. Two quick questions to move
> forward
>
>   - To keep the Flink code base from becoming too big (hard to maintain and
> test) we started working with Apache Bahir as a project dedicated to
> streaming connectors. Would that be a good target for the connector?
>
>   - What are your thoughts on fault tolerance for that connector?
>
> On Mon, Sep 26, 2016 at 3:01 PM, shijinkui  wrote:
>
> > Hi, all
> >
> > 1.In order to support end-to-end pushed source, I create FLINK-4630<
> > https://issues.apache.org/jira/browse/FLINK-4630>. I want to know
> > whether is this idea worth?
> >
> > ---
> > When source stream get start, listen a provided tcp port, receive
> > stream data from user data source.
> > This netty tcp source is keepping alive and end-to-end, that is from
> > business system to flink worker directly.
> >
> > user app push ->  netty server source of Flink
> >
> > describe the source in detail below:
> >
> > 1.source run as a netty tcp server
> > 2.user provide a tcp port, if the port is in used, increace the port
> > number between 1024 to 65535. Source can parallel.
> > 3.callback the provided url to report the real port to listen
> > 4.user push streaming data to netty server, then collect the data to
> > flink
> >
> >
> > Thanks
> >
> > Jinkui Shi
> >
> >
>


[jira] [Created] (FLINK-4696) Limit the number of Akka Dispatcher Threads in LocalMiniCluster

2016-09-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4696:
---

 Summary: Limit the number of Akka Dispatcher Threads in 
LocalMiniCluster
 Key: FLINK-4696
 URL: https://issues.apache.org/jira/browse/FLINK-4696
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.2.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0


By default, akka spawns 2x or 3x the number of cores in threads.

For the LocalFlinkMiniCluster, running on Travis (often 64 cores), with 
separate actor systems for jobmanager and multiple taskmanagers, this frequetly 
means >600 akka threads. Flink uses about 4 actors.

This simply eats unnecessary resources. I suggest to have at most 4 threads per 
actor system in test setups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


答复: 答复: [DISCUSS] add netty tcp/restful pushed source support

2016-09-27 Thread shijinkui
It's nice. Will present flink source connector be pushed to bahir-flink?
I can add netty-source to bahir-flink.

Maven repository have no bahir-flink's.
https://mvnrepository.com/artifact/org.apache.bahir

-邮件原件-
发件人: Greg Hogan [mailto:c...@greghogan.com] 
发送时间: 2016年9月27日 20:58
收件人: dev@flink.apache.org
主题: Re: 答复: [DISCUSS] add netty tcp/restful pushed source support

Apache Bahir's website only suggests support for additional frameworks, but 
there is a Flink repository at
  https://github.com/apache/bahir-flink

On Tue, Sep 27, 2016 at 8:38 AM, shijinkui  wrote:

> Hey, Stephan Ewen
>
> 1.  bahir's target is spark. The contributer are rxin, srowen, tdas,
> mateiz and so on.
> If we want bahir used by flink, we can suggest bahir provide 
> streaming connecter interface, such as store(), start(), stop(), 
> restart(), receiving(Any)...
> Then same streaming connector can be implemented by spark and 
> flink. But I think this is impossible, as bahir depend spark-streaming 
> and spark sql.
> 2.  About connector fault tolerance. Bahir's mqtt and akka connector
> are themselves' storage. But netty have no persist data feature.
> I think we can append data to a ringbuffer. When SourceContext
> collect() throw error, then write message to inform client to stop 
> send message. When flink SourceContext is normal, then write the 
> ringbuffer data to flink, inform client to go on.
> Because pushing mode is hard to control the flow throughput, 
> the upstream client can.
> This netty connector's purpose is end-to-end streaming, 
> minimum time delay.
> 3.  Later on, We can provide http protocol ability, put, post, get,
> even websocket or jersey restful based on netty.
>
> -邮件原件-
> 发件人: Stephan Ewen [mailto:se...@apache.org]
> 发送时间: 2016年9月27日 16:54
> 收件人: dev@flink.apache.org
> 主题: Re: [DISCUSS] add netty tcp/restful pushed source support
>
> I think that could be an interesting source. Two quick questions to 
> move forward
>
>   - To keep the Flink code base from becoming too big (hard to 
> maintain and
> test) we started working with Apache Bahir as a project dedicated to 
> streaming connectors. Would that be a good target for the connector?
>
>   - What are your thoughts on fault tolerance for that connector?
>
> On Mon, Sep 26, 2016 at 3:01 PM, shijinkui  wrote:
>
> > Hi, all
> >
> > 1.In order to support end-to-end pushed source, I create FLINK-4630<
> > https://issues.apache.org/jira/browse/FLINK-4630>. I want to know 
> > whether is this idea worth?
> >
> > ---
> > When source stream get start, listen a provided tcp port, receive 
> > stream data from user data source.
> > This netty tcp source is keepping alive and end-to-end, that is from 
> > business system to flink worker directly.
> >
> > user app push ->  netty server source of Flink
> >
> > describe the source in detail below:
> >
> > 1.source run as a netty tcp server
> > 2.user provide a tcp port, if the port is in used, increace the port
> > number between 1024 to 65535. Source can parallel.
> > 3.callback the provided url to report the real port to listen
> > 4.user push streaming data to netty server, then collect the data to
> > flink
> >
> >
> > Thanks
> >
> > Jinkui Shi
> >
> >
>


[DISCUSS] Removing delete*Timer from the WindowOperator.Context

2016-09-27 Thread Kostas Kloudas
Hi all,

As the title of this email suggests, I am proposing to remove the  methods 
deleteProcessingTimeTimer(long time) and deleteEventTimeTimer(long time)
from the WindowOperator.Context. With this change, registered timers that 
have nothing to do (e.g. because their state has already been cleaned up) 
will be simply ignored by the windowOperator, when their time comes.

The reason for the change is that by allowing custom user code, e.g. a custom 
Trigger,
to delete timers we may have unpredictable behavior. 

As an example, one can imagine the case where we have allowed_lateness = 0 and 
the cleanup 
timer for a window collides with the end_of_window one. In this case, by 
deleting the end_of_window 
timer from the trigger (possibly a custom one), we end up also deleting the 
cleanup one, 
which in turn can lead to the window state never being garbage collected. 

To see what can be the consequences apart from memory leaks, this can easily 
lead 
to wrong session windows, as a session that should have been garbage collected, 
will 
still be around and ready to accept new data.

With this change, timers that should correctly be deleted will now remain in 
the queue of 
pending timers, but they will do nothing, while cleanup timers will cleanup the 
state of their 
corresponding window.

Other possible solutions like keeping a separate list for cleanup timers would 
complicate 
the codebase and also introduce memory overheads which can be avoided using the 
solution above (i.e. just ignoring timers the have nothing to do anymore).

What do you think?

Kostas



[jira] [Created] (FLINK-4697) Gather more detailed checkpoint stats in CheckpointStatsTracker

2016-09-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4697:
---

 Summary: Gather more detailed checkpoint stats in 
CheckpointStatsTracker
 Key: FLINK-4697
 URL: https://issues.apache.org/jira/browse/FLINK-4697
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
 Fix For: 1.2.0


The additional information attached to the {{AcknowledgeCheckpoint}} method 
must be gathered in the {{CheckpointStatsTracker}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4698) Visualize additional checkpoint information

2016-09-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4698:
---

 Summary: Visualize additional checkpoint information
 Key: FLINK-4698
 URL: https://issues.apache.org/jira/browse/FLINK-4698
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Reporter: Stephan Ewen
 Fix For: 1.2.0


Display the additional information gathered in the {{CheckpointStatsTracker}} 
in the "Checkpoint" tab.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4699) Convert Kafka TableSource/TableSink tests to unit tests

2016-09-27 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4699:
---

 Summary: Convert Kafka TableSource/TableSink tests to unit tests
 Key: FLINK-4699
 URL: https://issues.apache.org/jira/browse/FLINK-4699
 Project: Flink
  Issue Type: Test
  Components: Table API & SQL
Reporter: Timo Walther


The Kafka tests are extremely heavy and that the Table Sources and Sinks are 
only thin wrappers on top of the Kafka Sources / Sinks. That should not need to 
bring up Kafka clusters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4700) Harden the TimeProvider test

2016-09-27 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-4700:
-

 Summary: Harden the TimeProvider test
 Key: FLINK-4700
 URL: https://issues.apache.org/jira/browse/FLINK-4700
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


Currently the TimeProvider test fails due to a race condition. This task aims 
at fixing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


flink 1.1.2 RichFunction not working

2016-09-27 Thread Chen Bekor
Hi,

Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression in
my code. In order to Isolate the issue I have written a small flink job
that demonstrates that.

The job does some time based window operations with an input csv file (in
the example below - count the number of events on sliding window of 3600
seconds every 10 seconds) using event time instead of wall clock time, and
writes results down to an output csv file.

When I upload the jar via flink admin UI (local cluster) and submit the
job, it returns as finished after a couple of seconds but the RichFunction
is not applied (I do not see any log messages and the output csv is empty)

same results when running in local mode (through the IDE).

Here is sample code (fully working example) to demonstrate what I'm trying
to achieve. Please note - it works on flink 1.0.2 but doesn't work on flink
1.1.2. No error thrown. The output CSV file is simply empty.


*Job class:*

import com.firelayers.spike.functions.sink.FileSinkFunction
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.Json

import scala.language.postfixOps

object JobTest {

implicit val Logger: Logger = LoggerFactory.getLogger(JobTest.getClass)

var parameterTool: ParameterTool = null
var env: StreamExecutionEnvironment = null

def main(args: Array[String]): Unit = {
implicit val ev = TypeInformation.of(classOf[MyEvent])
parameterTool = ParameterTool.fromArgs(args)
val input_path = parameterTool.getRequired("in.path")
val out_path = parameterTool.getRequired("out.path")
env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val events_stream = env.readTextFile(s"file://$input_path").map(new
ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp)
implicit val sti = TypeInformation.of(classOf[String])
events_stream.keyBy(_.guid)
.timeWindow(Time.seconds(3600), Time.seconds(10))
.apply[String](new TFunction[MyEvent]).name("test")
.addSink(new FileSinkFunction[String](out_path))
env.execute("test")
}
}

class TFunction[T] extends WindowFunction[T, String, String, TimeWindow] {
val logger: Logger = LoggerFactory.getLogger(this.getClass)

override def apply(
  cell: String,
  window: TimeWindow,
  events: Iterable[T],
  out: Collector[String]): Unit = {

val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]]
val data = s"${events_as_seq.head.guid}"
logger.info(events_as_seq.size.toString)
if(events_as_seq.size > 2)
out.collect(s"${events_as_seq.size.toString} $data")
}
}

case class MyEvent(guid: String, request_date_time_epoch: String) {}

object MyEvent {

implicit val jsonWriter = Json.writes[MyEvent]
implicit val jsonReader = Json.reads[MyEvent]

def parseFromString(record: String): MyEvent = {
val data = record.split(",")
new MyEvent(data.apply(1), data.apply(0))
}
}

class MyTimestamp extends AssignerWithPeriodicWatermarks[MyEvent] {
private var currentTimestamp = Long.MinValue

override def extractTimestamp(element: MyEvent, previous_timestamp:
Long): Long = {
this.currentTimestamp = element.request_date_time_epoch.toLong
this.currentTimestamp
}

override def getCurrentWatermark: Watermark = {
val watermark_value = if(currentTimestamp == Long.MinValue)
Long.MinValue else currentTimestamp - 1
new Watermark(watermark_value)
}
}

class ParseMyEventFunction extends RichMapFunction[String, MyEvent] {
override def map(record: String): MyEvent = {
MyEvent.parseFromString(record)
}
}


To run - use this parameters: --in.path /in.csv --out.path /out.csv

This is a sample input csv file (in.csv):

1446651603000,ae741eb9efa47d14a2b43c07b0286306
1446651617000,ae741eb9efa47d14a2b43c07b0286306
1446651762000,321bfb334d878bd321d2fd38eebfbb3d
1446651846000,ae741eb9efa47d14a2b43c07b0286306
1446651861000,321bfb334d878bd321d2fd38eebfbb3d
1446651885000,321bfb334d878bd321d2fd38eebfbb3d
1446652058000,873cd5958ec5e8b6f555ee95abc6abb4
1446652303000,dc668e71e7cfe2f

[jira] [Created] (FLINK-4701) Unprotected access to cancelables in StreamTask

2016-09-27 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4701:
-

 Summary: Unprotected access to cancelables in StreamTask
 Key: FLINK-4701
 URL: https://issues.apache.org/jira/browse/FLINK-4701
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


In performCheckpoint():
{code}
AsyncCheckpointRunnable asyncCheckpointRunnable 
= new AsyncCheckpointRunnable(
"checkpoint-" + checkpointId + 
"-" + timestamp,
this,
cancelables,
chainedStateHandles,
keyGroupsStateHandleFuture,
checkpointId,
bytesBufferedAlignment,
alignmentDurationNanos,
syncDurationMillis,
endOfSyncPart);

synchronized (cancelables) {

cancelables.add(asyncCheckpointRunnable);
}
{code}
Construction of AsyncCheckpointRunnable should be put under the synchronized 
block of cancelables.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: flink 1.1.2 RichFunction not working

2016-09-27 Thread sunny patel
Hi Chen,

Please upload your Flink scala library dependencies.

Regards
Sunny.

On Tue, Sep 27, 2016 at 5:56 PM, Chen Bekor  wrote:

> Hi,
>
> Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression in
> my code. In order to Isolate the issue I have written a small flink job
> that demonstrates that.
>
> The job does some time based window operations with an input csv file (in
> the example below - count the number of events on sliding window of 3600
> seconds every 10 seconds) using event time instead of wall clock time, and
> writes results down to an output csv file.
>
> When I upload the jar via flink admin UI (local cluster) and submit the
> job, it returns as finished after a couple of seconds but the RichFunction
> is not applied (I do not see any log messages and the output csv is empty)
>
> same results when running in local mode (through the IDE).
>
> Here is sample code (fully working example) to demonstrate what I'm trying
> to achieve. Please note - it works on flink 1.0.2 but doesn't work on flink
> 1.1.2. No error thrown. The output CSV file is simply empty.
>
>
> *Job class:*
>
> import com.firelayers.spike.functions.sink.FileSinkFunction
> import org.apache.flink.api.common.functions.RichMapFunction
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.
> AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.WindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.util.Collector
> import org.slf4j.{Logger, LoggerFactory}
> import play.api.libs.json.Json
>
> import scala.language.postfixOps
>
> object JobTest {
>
> implicit val Logger: Logger = LoggerFactory.getLogger(
> JobTest.getClass)
>
> var parameterTool: ParameterTool = null
> var env: StreamExecutionEnvironment = null
>
> def main(args: Array[String]): Unit = {
> implicit val ev = TypeInformation.of(classOf[MyEvent])
> parameterTool = ParameterTool.fromArgs(args)
> val input_path = parameterTool.getRequired("in.path")
> val out_path = parameterTool.getRequired("out.path")
> env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val events_stream = env.readTextFile(s"file://$input_path").map(new
> ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp)
> implicit val sti = TypeInformation.of(classOf[String])
> events_stream.keyBy(_.guid)
> .timeWindow(Time.seconds(3600), Time.seconds(10))
> .apply[String](new TFunction[MyEvent]).name("test")
> .addSink(new FileSinkFunction[String](out_path))
> env.execute("test")
> }
> }
>
> class TFunction[T] extends WindowFunction[T, String, String, TimeWindow] {
> val logger: Logger = LoggerFactory.getLogger(this.getClass)
>
> override def apply(
>   cell: String,
>   window: TimeWindow,
>   events: Iterable[T],
>   out: Collector[String]): Unit = {
>
> val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]]
> val data = s"${events_as_seq.head.guid}"
> logger.info(events_as_seq.size.toString)
> if(events_as_seq.size > 2)
> out.collect(s"${events_as_seq.size.toString} $data")
> }
> }
>
> case class MyEvent(guid: String, request_date_time_epoch: String) {}
>
> object MyEvent {
>
> implicit val jsonWriter = Json.writes[MyEvent]
> implicit val jsonReader = Json.reads[MyEvent]
>
> def parseFromString(record: String): MyEvent = {
> val data = record.split(",")
> new MyEvent(data.apply(1), data.apply(0))
> }
> }
>
> class MyTimestamp extends AssignerWithPeriodicWatermarks[MyEvent] {
> private var currentTimestamp = Long.MinValue
>
> override def extractTimestamp(element: MyEvent, previous_timestamp:
> Long): Long = {
> this.currentTimestamp = element.request_date_time_epoch.toLong
> this.currentTimestamp
> }
>
> override def getCurrentWatermark: Watermark = {
> val watermark_value = if(currentTimestamp == Long.MinValue)
> Long.MinValue else currentTimestamp - 1
> new Watermark(watermark_value)
> }
> }
>
> class ParseMyEventFunction extends RichMapFunction[String, MyEvent] {
> override def map(record: String): MyEvent = {
> MyEvent.parseFromString(record)
> }
> }
>
>
> To run - use this parameters: --in.path /in.csv --out.path /out.csv
>
> This is a sample input csv file (in.csv):
>
>

Re: flink 1.1.2 RichFunction not working

2016-09-27 Thread Stephan Ewen
Sorry for the inconvenience. This is a known issue and being fixed for
Flink 1.1.3 - the problem is that the streaming File sources were reworked
to continuously monitor the File System, but the watermarks are not handled
correctly.

https://issues.apache.org/jira/browse/FLINK-4329

So far, 2/3 parts of the fix are in, the last part is an open pull request.
Once that is in, we can look into getting Flink 1.1.3 out.

Best,
Stephan



On Tue, Sep 27, 2016 at 8:17 PM, sunny patel  wrote:

> Hi Chen,
>
> Please upload your Flink scala library dependencies.
>
> Regards
> Sunny.
>
> On Tue, Sep 27, 2016 at 5:56 PM, Chen Bekor  wrote:
>
> > Hi,
> >
> > Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression
> in
> > my code. In order to Isolate the issue I have written a small flink job
> > that demonstrates that.
> >
> > The job does some time based window operations with an input csv file (in
> > the example below - count the number of events on sliding window of 3600
> > seconds every 10 seconds) using event time instead of wall clock time,
> and
> > writes results down to an output csv file.
> >
> > When I upload the jar via flink admin UI (local cluster) and submit the
> > job, it returns as finished after a couple of seconds but the
> RichFunction
> > is not applied (I do not see any log messages and the output csv is
> empty)
> >
> > same results when running in local mode (through the IDE).
> >
> > Here is sample code (fully working example) to demonstrate what I'm
> trying
> > to achieve. Please note - it works on flink 1.0.2 but doesn't work on
> flink
> > 1.1.2. No error thrown. The output CSV file is simply empty.
> >
> >
> > *Job class:*
> >
> > import com.firelayers.spike.functions.sink.FileSinkFunction
> > import org.apache.flink.api.common.functions.RichMapFunction
> > import org.apache.flink.api.common.typeinfo.TypeInformation
> > import org.apache.flink.api.java.utils.ParameterTool
> > import org.apache.flink.streaming.api.TimeCharacteristic
> > import org.apache.flink.streaming.api.functions.
> > AssignerWithPeriodicWatermarks
> > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> > import org.apache.flink.streaming.api.scala.function.WindowFunction
> > import org.apache.flink.streaming.api.watermark.Watermark
> > import org.apache.flink.streaming.api.windowing.time.Time
> > import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> > import org.apache.flink.util.Collector
> > import org.slf4j.{Logger, LoggerFactory}
> > import play.api.libs.json.Json
> >
> > import scala.language.postfixOps
> >
> > object JobTest {
> >
> > implicit val Logger: Logger = LoggerFactory.getLogger(
> > JobTest.getClass)
> >
> > var parameterTool: ParameterTool = null
> > var env: StreamExecutionEnvironment = null
> >
> > def main(args: Array[String]): Unit = {
> > implicit val ev = TypeInformation.of(classOf[MyEvent])
> > parameterTool = ParameterTool.fromArgs(args)
> > val input_path = parameterTool.getRequired("in.path")
> > val out_path = parameterTool.getRequired("out.path")
> > env = StreamExecutionEnvironment.getExecutionEnvironment
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > val events_stream = env.readTextFile(s"file://$
> input_path").map(new
> > ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp)
> > implicit val sti = TypeInformation.of(classOf[String])
> > events_stream.keyBy(_.guid)
> > .timeWindow(Time.seconds(3600), Time.seconds(10))
> > .apply[String](new TFunction[MyEvent]).name("test")
> > .addSink(new FileSinkFunction[String](out_path))
> > env.execute("test")
> > }
> > }
> >
> > class TFunction[T] extends WindowFunction[T, String, String, TimeWindow]
> {
> > val logger: Logger = LoggerFactory.getLogger(this.getClass)
> >
> > override def apply(
> >   cell: String,
> >   window: TimeWindow,
> >   events: Iterable[T],
> >   out: Collector[String]): Unit = {
> >
> > val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]]
> > val data = s"${events_as_seq.head.guid}"
> > logger.info(events_as_seq.size.toString)
> > if(events_as_seq.size > 2)
> > out.collect(s"${events_as_seq.size.toString} $data")
> > }
> > }
> >
> > case class MyEvent(guid: String, request_date_time_epoch: String) {}
> >
> > object MyEvent {
> >
> > implicit val jsonWriter = Json.writes[MyEvent]
> > implicit val jsonReader = Json.reads[MyEvent]
> >
> > def parseFromString(record: String): MyEvent = {
> > val data = record.split(",")
> > new MyEvent(data.apply(1), data.apply(0))
> > }
> > }
> >
> > class MyTimestamp extends AssignerWithPeriodicWatermarks[MyEvent] {
> > private var currentTimestamp = Long.MinValue
> >
> >  

[jira] [Created] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4702:
---

 Summary: Kafka consumer must commit offsets asynchronously
 Key: FLINK-4702
 URL: https://issues.apache.org/jira/browse/FLINK-4702
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.1.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 1.2.0, 1.1.3


The offset commit calls to Kafka may occasionally take very long.
In that case, the {{notifyCheckpointComplete()}} method blocks for long and the 
KafkaConsumer cannot make progress and cannot perform checkpoints.

Kafka 0.9+ have methods to commit asynchronously.
We should use those and make sure no more than one commit is concurrently in 
progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: flink 1.1.2 RichFunction not working

2016-09-27 Thread Chen Bekor
thanks. worth mentioning in the release notes of 1.1.2 that file source is
broken. we spent a substantial time on trying  to figure out what's the
root cause.

On Sep 27, 2016 9:40 PM, "Stephan Ewen"  wrote:

> Sorry for the inconvenience. This is a known issue and being fixed for
> Flink 1.1.3 - the problem is that the streaming File sources were reworked
> to continuously monitor the File System, but the watermarks are not handled
> correctly.
>
> https://issues.apache.org/jira/browse/FLINK-4329
>
> So far, 2/3 parts of the fix are in, the last part is an open pull request.
> Once that is in, we can look into getting Flink 1.1.3 out.
>
> Best,
> Stephan
>
>
>
> On Tue, Sep 27, 2016 at 8:17 PM, sunny patel  wrote:
>
>> Hi Chen,
>>
>> Please upload your Flink scala library dependencies.
>>
>> Regards
>> Sunny.
>>
>> On Tue, Sep 27, 2016 at 5:56 PM, Chen Bekor  wrote:
>>
>> > Hi,
>> >
>> > Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression
>> in
>> > my code. In order to Isolate the issue I have written a small flink job
>> > that demonstrates that.
>> >
>> > The job does some time based window operations with an input csv file
>> (in
>> > the example below - count the number of events on sliding window of 3600
>> > seconds every 10 seconds) using event time instead of wall clock time,
>> and
>> > writes results down to an output csv file.
>> >
>> > When I upload the jar via flink admin UI (local cluster) and submit the
>> > job, it returns as finished after a couple of seconds but the
>> RichFunction
>> > is not applied (I do not see any log messages and the output csv is
>> empty)
>> >
>> > same results when running in local mode (through the IDE).
>> >
>> > Here is sample code (fully working example) to demonstrate what I'm
>> trying
>> > to achieve. Please note - it works on flink 1.0.2 but doesn't work on
>> flink
>> > 1.1.2. No error thrown. The output CSV file is simply empty.
>> >
>> >
>> > *Job class:*
>> >
>> > import com.firelayers.spike.functions.sink.FileSinkFunction
>> > import org.apache.flink.api.common.functions.RichMapFunction
>> > import org.apache.flink.api.common.typeinfo.TypeInformation
>> > import org.apache.flink.api.java.utils.ParameterTool
>> > import org.apache.flink.streaming.api.TimeCharacteristic
>> > import org.apache.flink.streaming.api.functions.
>> > AssignerWithPeriodicWatermarks
>> > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> > import org.apache.flink.streaming.api.scala.function.WindowFunction
>> > import org.apache.flink.streaming.api.watermark.Watermark
>> > import org.apache.flink.streaming.api.windowing.time.Time
>> > import org.apache.flink.streaming.api.windowing.windows.TimeWindow
>> > import org.apache.flink.util.Collector
>> > import org.slf4j.{Logger, LoggerFactory}
>> > import play.api.libs.json.Json
>> >
>> > import scala.language.postfixOps
>> >
>> > object JobTest {
>> >
>> > implicit val Logger: Logger = LoggerFactory.getLogger(
>> > JobTest.getClass)
>> >
>> > var parameterTool: ParameterTool = null
>> > var env: StreamExecutionEnvironment = null
>> >
>> > def main(args: Array[String]): Unit = {
>> > implicit val ev = TypeInformation.of(classOf[MyEvent])
>> > parameterTool = ParameterTool.fromArgs(args)
>> > val input_path = parameterTool.getRequired("in.path")
>> > val out_path = parameterTool.getRequired("out.path")
>> > env = StreamExecutionEnvironment.getExecutionEnvironment
>> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> > val events_stream = env.readTextFile(s"file://$inp
>> ut_path").map(new
>> > ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp)
>> > implicit val sti = TypeInformation.of(classOf[String])
>> > events_stream.keyBy(_.guid)
>> > .timeWindow(Time.seconds(3600), Time.seconds(10))
>> > .apply[String](new TFunction[MyEvent]).name("test")
>> > .addSink(new FileSinkFunction[String](out_path))
>> > env.execute("test")
>> > }
>> > }
>> >
>> > class TFunction[T] extends WindowFunction[T, String, String,
>> TimeWindow] {
>> > val logger: Logger = LoggerFactory.getLogger(this.getClass)
>> >
>> > override def apply(
>> >   cell: String,
>> >   window: TimeWindow,
>> >   events: Iterable[T],
>> >   out: Collector[String]): Unit = {
>> >
>> > val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]]
>> > val data = s"${events_as_seq.head.guid}"
>> > logger.info(events_as_seq.size.toString)
>> > if(events_as_seq.size > 2)
>> > out.collect(s"${events_as_seq.size.toString} $data")
>> > }
>> > }
>> >
>> > case class MyEvent(guid: String, request_date_time_epoch: String) {}
>> >
>> > object MyEvent {
>> >
>> > implicit val jsonWriter = Json.writes[MyEvent]
>> > implicit 

scala api createLocalEnvironment() function add default Configuration parameter

2016-09-27 Thread shijinkui
Hi,all

scala program can't direct use createLocalEnvironment with custom Configure 
object.

such as I want to start web server in local mode with Flink UI, I will do such 
as:

```
// set up execution environment
val conf = new Configuration
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)
val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(
  
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2,
 conf)
)
```

For using conveniently and perfect API, we need createLocalEnvironment function 
have a config parameter

detail in FLINK-4669.

-Jinkui Shi


Re: Exception from in-progress implementation of Python API bulk iterations

2016-09-27 Thread Geoffrey Mon
Hello Chesnay,

Thank you for your help. After receiving your message I recompiled my
version of Flink completely, and both the NullPointerException listed in
the TODO and the ClassCastException with the join operation went away.
Previously, I had been only recompiling the modules of Flink that had been
changed to save time using "mvn clean install -pl :module" and apparently
that may have been causing some of my issues.

Now, the problem is more clear: when a specific group reduce function in my
research project plan file is used within an iteration, I get a
ClassCastException exception:
Caused by: java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
at
org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
at
org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
at
org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
at java.lang.Thread.run(Thread.java:745)

I'm not sure why this is causing an exception, and I would greatly
appreciate any assistance. I've revised the barebones error-causing plan
file to focus on this new error source:
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
The group reduce function in question seems to work just fine outside of
iterations. I have organized the commits and pushed to a new branch to make
it easier to test and hopefully review soon:
https://github.com/GEOFBOT/flink/tree/new-iterations

Cheers,
Geoffrey

On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler  wrote:

> Hello Geoffrey,
>
> i could not reproduce this issue with the commits and plan you provided.
>
> I tried out both the FLINK-4098 and bulk-iterations branches (and
> reverted back to the specified commits) and built Flink from scratch.
>
> Could you double check that the code you provided produces the error?
> Also, which OS/python version are you using?
>
> Regards,
> Chesnay
>
> On 20.09.2016 11:13, Chesnay Schepler wrote:
> > Hello,
> >
> > I'll try to take a look this week.
> >
> > Regards,
> > Chesnay
> >
> > On 20.09.2016 02:38, Geoffrey Mon wrote:
> >> Hello all,
> >>
> >> I have recently been working on adding bulk iterations to the Python
> >> API of
> >> Flink in order to facilitate a research project I am working on. The
> >> current changes can be seen in this GitHub diff:
> >>
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
> >>
> >>
> >> This implementation seems to work for, at least, simple examples,
> >> such as
> >> incrementing numbers in a data set. However, with the transformations
> >> required for my project, I get an exception
> >> "java.lang.ClassCastException:
> >> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
> >> from the
> >> deserializers called by
> >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
> >> I've created the following simplified Python plan by stripping down my
> >> research project code to the problem-causing parts:
> >> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>
> >> I have been working on this issue but I don't have any ideas on what
> >> might
> >> be the problem. Perhaps someone more knowledgeable about the interior of
> >> the Python API could kindly help?
> >>
> >> Thank you very much.
> >>
> >> Geoffrey Mon
> >>
> >
> >
>
>