[jira] [Created] (FLINK-4691) Add group-windows for streaming tables
Timo Walther created FLINK-4691: --- Summary: Add group-windows for streaming tables Key: FLINK-4691 URL: https://issues.apache.org/jira/browse/FLINK-4691 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Add Tumble, Slide, Session group-windows for streaming tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. Implementation of group-windows on streaming tables. This includes implementing the API of group-windows, the logical validation for group-windows, and the definition of the “rowtime” and “systemtime” keywords. Group-windows on batch tables won’t be initially supported and will throw an exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4692) Add tumbling and sliding group-windows for batch tables
Timo Walther created FLINK-4692: --- Summary: Add tumbling and sliding group-windows for batch tables Key: FLINK-4692 URL: https://issues.apache.org/jira/browse/FLINK-4692 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Add Tumble and Slide group-windows for batch tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4693) Add session group-windows for batch tables
Timo Walther created FLINK-4693: --- Summary: Add session group-windows for batch tables Key: FLINK-4693 URL: https://issues.apache.org/jira/browse/FLINK-4693 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Add Session group-windows for batch tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Regarding `flink-streaming` project dependencies
Hi folks, There are comments like this in `StreamExecutionEnvironment.getExecutionEnvironment()`: // because the streaming project depends on "flink-clients" (and not the > other way around) > // we currently need to intercept the data set environment and create a > dependent stream env. > // this should be fixed once we rework the project dependencies https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1592-L1594 I wonder if there is any plan to rework the project dependencies? In the long run, are we planning to have `flink-clients` depend on `flink-streaming` or to have `flink-java` depend on `flink-clients`? Thanks! Liwei
Re: Regarding `flink-streaming` project dependencies
Hi! Yes, there are definitely plans and desires to do that, definitely. May be breaking some API / dependency structure, so probably a candidate for Flink 2.0 Greetings, Stephan On Tue, Sep 27, 2016 at 10:45 AM, Liwei Lin wrote: > Hi folks, > > There are comments like this in > `StreamExecutionEnvironment.getExecutionEnvironment()`: > > // because the streaming project depends on "flink-clients" (and not the > > other way around) > > // we currently need to intercept the data set environment and create a > > dependent stream env. > > // this should be fixed once we rework the project dependencies > > > https://github.com/apache/flink/blob/master/flink- > streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ > StreamExecutionEnvironment.java#L1592-L1594 > > I wonder if there is any plan to rework the project dependencies? In the > long run, are we planning to have `flink-clients` depend on > `flink-streaming` or to have `flink-java` depend on `flink-clients`? > > Thanks! > > Liwei >
[jira] [Created] (FLINK-4694) Add wait for termination function to RpcEndpoints
Till Rohrmann created FLINK-4694: Summary: Add wait for termination function to RpcEndpoints Key: FLINK-4694 URL: https://issues.apache.org/jira/browse/FLINK-4694 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.2.0 Reporter: Till Rohrmann Assignee: Till Rohrmann In order to run a {{RpcEndpoint}} it would be useful to have a function which allows to wait for the termination of the {{RpcEndpoint}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] add netty tcp/restful pushed source support
I think that could be an interesting source. Two quick questions to move forward - To keep the Flink code base from becoming too big (hard to maintain and test) we started working with Apache Bahir as a project dedicated to streaming connectors. Would that be a good target for the connector? - What are your thoughts on fault tolerance for that connector? On Mon, Sep 26, 2016 at 3:01 PM, shijinkui wrote: > Hi, all > > 1.In order to support end-to-end pushed source, I create FLINK-4630< > https://issues.apache.org/jira/browse/FLINK-4630>. I want to know whether > is this idea worth? > > --- > When source stream get start, listen a provided tcp port, receive stream > data from user data source. > This netty tcp source is keepping alive and end-to-end, that is from > business system to flink worker directly. > > user app push -> netty server source of Flink > > describe the source in detail below: > > 1.source run as a netty tcp server > 2.user provide a tcp port, if the port is in used, increace the port > number between 1024 to 65535. Source can parallel. > 3.callback the provided url to report the real port to listen > 4.user push streaming data to netty server, then collect the data to > flink > > > Thanks > > Jinkui Shi > >
Re: No support for request PutMappingRequest
Hi, the mapping should not be updated in the Flink sink. According to the documentation the mapping is a setting on an index that should not be changed after an index was created and some documents were added to that index: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html I think you have to set your mappings/settings on the index before you start your Flink job. The Flink sink is only meant for adding elements to an index. Cheers, Aljoscha On Mon, 26 Sep 2016 at 15:39 Ozan DENİZ wrote: > Hi Aljoscha, > > > We are trying to add sortable feature for elasticsearch. To do this, we > need to add mapping to index. > > > We try to sort some fields in elasticsearch. To make it our json format > should like this; > > > "tweet": { > "type": "string", > "analyzer": "english", > "fields": { > "raw": { > "type": "string", > "index": "not_analyzed" > } > } > } > > > > We should add "not_analyzed" to map. > > > private XContentBuilder buildMapping( String typeName ) > { > > XContentBuilder mapping = null; > try > { > mapping = jsonBuilder() > .startObject() > .startObject(typeName) > .startObject("properties") > > > .startObject(LogFields.MESSAGE) > .field("type","string") > .field("analyzer", "standard") > .startObject("fields") > .startObject("raw") > .field("type","string") > .field("index", "not_analyzed") > .endObject() > .endObject() > .endObject() > > .endObject() > .endObject() > .endObject(); > } > catch ( IOException e ) > { > e.printStackTrace(); > } > > return mapping; > } > > > > > > Gönderen: Aljoscha Krettek > Gönderildi: 26 Eylül 2016 Pazartesi 16:27:12 > Kime: dev@flink.apache.org > Konu: Re: No support for request PutMappingRequest > > Hi, > I think PutMappingRequest is a request that can only be sent using > IndicesAdminClient. In my understanding this is an administrative command > that isn't related to actually storing data in an index. > > What are you trying to store with the PutMappingRequest? > > Cheers, > Aljoscha > > On Mon, 26 Sep 2016 at 15:16 Ozan DENİZ wrote: > > > Hi, > > > > > > I am sending the error message below; > > > > > > java.lang.IllegalArgumentException: No support for request > > > [org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest@3ed05d8b > > ] > > at org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:107) > > ~[elasticsearch-2.3.5.jar:2.3.5] > > at > > > org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:284) > > ~[elasticsearch-2.3.5.jar:2.3.5] > > at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:268) > > ~[elasticsearch-2.3.5.jar:2.3.5] > > at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264) > > ~[elasticsearch-2.3.5.jar:2.3.5] > > at > > > org.apache.flink.streaming.connectors.elasticsearch2.BulkProcessorIndexer.add(BulkProcessorIndexer.java:32) > > > ~[flink-connector-elasticsearch2_2.10-1.2-20160926.041955-45.jar:1.2-SNAPSHOT] > > at sink.ESSink.process(ESSink.java:77) ~[classes/:?] > > at sink.ESSink.process(ESSink.java:25) ~[classes/:?] > > at > > > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.invoke(ElasticsearchSink.java:232) > > > ~[flink-connector-elasticsearch2_2.10-1.2-20160926.041955-45.jar:1.2-SNAPSHOT] > > at > > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1] > > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:176) > > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1] > > at > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1] > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > > ~[flink-streaming-java_2.10-1.1.1.jar:1.1.1] > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > [flink-runtime_2.10-1.1.1.jar:1.1.1] > > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101] > > 09/26/2016 16:14:05 Sink: Unnamed(1/1) switched to FAILED > > > > > > We try to add mapping request to elastic search. We cannot access to > > client attribute (it is private) in elasticsearch class. > > > > > > Is there any way to overcome this problem. > > > > > > Thanks, > > > > > > Ozan > > > > > > > > > > Gönderen: Till Rohrmann > > Gönderildi: 26 Eylül 2016 Pazartesi 13:30:34 > > Kime: dev@flink.apache.org > > Bilgi: Aljoscha Krettek > > Konu: Re: No su
Re: Regarding `flink-streaming` project dependencies
Thanks Stephan for the prompt response! Glad to know it's targeted for Flink 2.0. Is there any JIRA tracking this? I couldn't find such one, :-) Thanks! Liwei On Tue, Sep 27, 2016 at 4:47 PM, Stephan Ewen wrote: > Hi! > > Yes, there are definitely plans and desires to do that, definitely. May be > breaking some API / dependency structure, so probably a candidate for Flink > 2.0 > > Greetings, > Stephan > > > On Tue, Sep 27, 2016 at 10:45 AM, Liwei Lin wrote: > > > Hi folks, > > > > There are comments like this in > > `StreamExecutionEnvironment.getExecutionEnvironment()`: > > > > // because the streaming project depends on "flink-clients" (and not the > > > other way around) > > > // we currently need to intercept the data set environment and create a > > > dependent stream env. > > > // this should be fixed once we rework the project dependencies > > > > > > https://github.com/apache/flink/blob/master/flink- > > streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ > > StreamExecutionEnvironment.java#L1592-L1594 > > > > I wonder if there is any plan to rework the project dependencies? In the > > long run, are we planning to have `flink-clients` depend on > > `flink-streaming` or to have `flink-java` depend on `flink-clients`? > > > > Thanks! > > > > Liwei > > >
[DISCUSS] Timely User Functions and Watermarks
Hi Folks, I'm in the process of implementing https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a problem with deciding how watermarks should be treated for operators that have more than one input. The problem is deciding when to fire event-time timers. For one-input operators it's pretty straightforward: fire once the watermark surpasses a given timer. For two-input operators we allow the operator implementer to observe the watermarks from the two inputs separately and react to that and also to decide what watermark to forward. With this it becomes hard to figure out when to fire timers. My thinking is that we should not allow operators to observe the watermark anymore but route it past the operator and deal with watermarks and timers outside of the operator. A timer for operators with more than one inputs (TwoInputOperator) would fire if the watermark from both inputs advances sufficiently far. Alternatively, we could still let operators observe watermarks but grab the watermark before it enters the operator and still deal with timers in the same way as proposed above. Any feedback on this is very welcome! What would you expect to happen for timers of operators with more than one input? Cheers, Aljoscha P.S. One reason for why I want to deal with watermark outside of operators is that otherwise every operator has to implement the functionality to update the current watermark at the timer service. i.e. something like this: @Internal public class StreamMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { private static final long serialVersionUID = 1L; public StreamMap(MapFunction mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue(; } @Override public void processWatermark(Watermark mark) throws Exception { timerService.updateWatermark(mark); // *<--- that's the thing I don't want* output.emitWatermark(mark); } } This becomes more complicated for two input operators which also do the merging of the watermarks from the two inputs right now.
RE: [DISCUSS] Timely User Functions and Watermarks
Hi Aljoscha, My 2 cents on this would be that it is worth maintaining the access to the watermarks. I think having the option to customize this is a strong point of Flink. Regarding the solution you proposed based on 2 input timers " would fire if the watermark from both inputs advances sufficiently far." I would propose to have the option to set a strategy for the timer. We could have: - EgerTrigger - when the triggering is fired when any of the inputs watermarks has advanced sufficiently far - LeftEgerTrigger - when the triggering is fired when any of left input watermarks has advanced sufficiently far - RightEgerTrigger - when the triggering is fired when any of right input watermarks has advanced sufficiently far - SyncTrigger - when the triggering is fired if the watermark from both inputs advances sufficiently far We could potentially include here the custom handling of the watermarks under a CustomTrigger strategy implemented as an operator that can be provided. -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Tuesday, September 27, 2016 11:28 AM To: Dev Subject: [DISCUSS] Timely User Functions and Watermarks Hi Folks, I'm in the process of implementing https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a problem with deciding how watermarks should be treated for operators that have more than one input. The problem is deciding when to fire event-time timers. For one-input operators it's pretty straightforward: fire once the watermark surpasses a given timer. For two-input operators we allow the operator implementer to observe the watermarks from the two inputs separately and react to that and also to decide what watermark to forward. With this it becomes hard to figure out when to fire timers. My thinking is that we should not allow operators to observe the watermark anymore but route it past the operator and deal with watermarks and timers outside of the operator. A timer for operators with more than one inputs (TwoInputOperator) would fire if the watermark from both inputs advances sufficiently far. Alternatively, we could still let operators observe watermarks but grab the watermark before it enters the operator and still deal with timers in the same way as proposed above. Any feedback on this is very welcome! What would you expect to happen for timers of operators with more than one input? Cheers, Aljoscha P.S. One reason for why I want to deal with watermark outside of operators is that otherwise every operator has to implement the functionality to update the current watermark at the timer service. i.e. something like this: @Internal public class StreamMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { private static final long serialVersionUID = 1L; public StreamMap(MapFunction mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue(; } @Override public void processWatermark(Watermark mark) throws Exception { timerService.updateWatermark(mark); // *<--- that's the thing I don't want* output.emitWatermark(mark); } } This becomes more complicated for two input operators which also do the merging of the watermarks from the two inputs right now.
JDBC connection in Flink using Scala
Hi Team, I am wondering is that possible to add JDBC connection as a source or target in Flink using Scala. Could you kindly some one help on this? DB write/sink code is not working. if you have any sample code please share it here. *Thanks* *Sunny* [image: Inline image 1]
答复: [DISCUSS] add netty tcp/restful pushed source support
Hey, Stephan Ewen 1. bahir's target is spark. The contributer are rxin, srowen, tdas, mateiz and so on. If we want bahir used by flink, we can suggest bahir provide streaming connecter interface, such as store(), start(), stop(), restart(), receiving(Any)... Then same streaming connector can be implemented by spark and flink. But I think this is impossible, as bahir depend spark-streaming and spark sql. 2. About connector fault tolerance. Bahir's mqtt and akka connector are themselves' storage. But netty have no persist data feature. I think we can append data to a ringbuffer. When SourceContext collect() throw error, then write message to inform client to stop send message. When flink SourceContext is normal, then write the ringbuffer data to flink, inform client to go on. Because pushing mode is hard to control the flow throughput, the upstream client can. This netty connector's purpose is end-to-end streaming, minimum time delay. 3. Later on, We can provide http protocol ability, put, post, get, even websocket or jersey restful based on netty. -邮件原件- 发件人: Stephan Ewen [mailto:se...@apache.org] 发送时间: 2016年9月27日 16:54 收件人: dev@flink.apache.org 主题: Re: [DISCUSS] add netty tcp/restful pushed source support I think that could be an interesting source. Two quick questions to move forward - To keep the Flink code base from becoming too big (hard to maintain and test) we started working with Apache Bahir as a project dedicated to streaming connectors. Would that be a good target for the connector? - What are your thoughts on fault tolerance for that connector? On Mon, Sep 26, 2016 at 3:01 PM, shijinkui wrote: > Hi, all > > 1.In order to support end-to-end pushed source, I create FLINK-4630< > https://issues.apache.org/jira/browse/FLINK-4630>. I want to know > whether is this idea worth? > > --- > When source stream get start, listen a provided tcp port, receive > stream data from user data source. > This netty tcp source is keepping alive and end-to-end, that is from > business system to flink worker directly. > > user app push -> netty server source of Flink > > describe the source in detail below: > > 1.source run as a netty tcp server > 2.user provide a tcp port, if the port is in used, increace the port > number between 1024 to 65535. Source can parallel. > 3.callback the provided url to report the real port to listen > 4.user push streaming data to netty server, then collect the data to > flink > > > Thanks > > Jinkui Shi > >
[jira] [Created] (FLINK-4695) Separate configuration parsing from MetricRegistry
Till Rohrmann created FLINK-4695: Summary: Separate configuration parsing from MetricRegistry Key: FLINK-4695 URL: https://issues.apache.org/jira/browse/FLINK-4695 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.2.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor In order to decouple the {{MetricRegistry}} object instantiation from the global configuration, we could introduce a {{MetricRegistryConfiguration}} object which encapsulates all necessary information for the {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static method to be generated from a {{Configuration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: 答复: [DISCUSS] add netty tcp/restful pushed source support
Apache Bahir's website only suggests support for additional frameworks, but there is a Flink repository at https://github.com/apache/bahir-flink On Tue, Sep 27, 2016 at 8:38 AM, shijinkui wrote: > Hey, Stephan Ewen > > 1. bahir's target is spark. The contributer are rxin, srowen, tdas, > mateiz and so on. > If we want bahir used by flink, we can suggest bahir provide > streaming connecter interface, such as store(), start(), stop(), restart(), > receiving(Any)... > Then same streaming connector can be implemented by spark and > flink. But I think this is impossible, as bahir depend spark-streaming and > spark sql. > 2. About connector fault tolerance. Bahir's mqtt and akka connector > are themselves' storage. But netty have no persist data feature. > I think we can append data to a ringbuffer. When SourceContext > collect() throw error, then write message to inform client to stop send > message. When flink SourceContext is normal, then write the ringbuffer data > to flink, inform client to go on. > Because pushing mode is hard to control the flow throughput, the > upstream client can. > This netty connector's purpose is end-to-end streaming, minimum > time delay. > 3. Later on, We can provide http protocol ability, put, post, get, > even websocket or jersey restful based on netty. > > -邮件原件- > 发件人: Stephan Ewen [mailto:se...@apache.org] > 发送时间: 2016年9月27日 16:54 > 收件人: dev@flink.apache.org > 主题: Re: [DISCUSS] add netty tcp/restful pushed source support > > I think that could be an interesting source. Two quick questions to move > forward > > - To keep the Flink code base from becoming too big (hard to maintain and > test) we started working with Apache Bahir as a project dedicated to > streaming connectors. Would that be a good target for the connector? > > - What are your thoughts on fault tolerance for that connector? > > On Mon, Sep 26, 2016 at 3:01 PM, shijinkui wrote: > > > Hi, all > > > > 1.In order to support end-to-end pushed source, I create FLINK-4630< > > https://issues.apache.org/jira/browse/FLINK-4630>. I want to know > > whether is this idea worth? > > > > --- > > When source stream get start, listen a provided tcp port, receive > > stream data from user data source. > > This netty tcp source is keepping alive and end-to-end, that is from > > business system to flink worker directly. > > > > user app push -> netty server source of Flink > > > > describe the source in detail below: > > > > 1.source run as a netty tcp server > > 2.user provide a tcp port, if the port is in used, increace the port > > number between 1024 to 65535. Source can parallel. > > 3.callback the provided url to report the real port to listen > > 4.user push streaming data to netty server, then collect the data to > > flink > > > > > > Thanks > > > > Jinkui Shi > > > > >
[jira] [Created] (FLINK-4696) Limit the number of Akka Dispatcher Threads in LocalMiniCluster
Stephan Ewen created FLINK-4696: --- Summary: Limit the number of Akka Dispatcher Threads in LocalMiniCluster Key: FLINK-4696 URL: https://issues.apache.org/jira/browse/FLINK-4696 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.2.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.2.0 By default, akka spawns 2x or 3x the number of cores in threads. For the LocalFlinkMiniCluster, running on Travis (often 64 cores), with separate actor systems for jobmanager and multiple taskmanagers, this frequetly means >600 akka threads. Flink uses about 4 actors. This simply eats unnecessary resources. I suggest to have at most 4 threads per actor system in test setups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
答复: 答复: [DISCUSS] add netty tcp/restful pushed source support
It's nice. Will present flink source connector be pushed to bahir-flink? I can add netty-source to bahir-flink. Maven repository have no bahir-flink's. https://mvnrepository.com/artifact/org.apache.bahir -邮件原件- 发件人: Greg Hogan [mailto:c...@greghogan.com] 发送时间: 2016年9月27日 20:58 收件人: dev@flink.apache.org 主题: Re: 答复: [DISCUSS] add netty tcp/restful pushed source support Apache Bahir's website only suggests support for additional frameworks, but there is a Flink repository at https://github.com/apache/bahir-flink On Tue, Sep 27, 2016 at 8:38 AM, shijinkui wrote: > Hey, Stephan Ewen > > 1. bahir's target is spark. The contributer are rxin, srowen, tdas, > mateiz and so on. > If we want bahir used by flink, we can suggest bahir provide > streaming connecter interface, such as store(), start(), stop(), > restart(), receiving(Any)... > Then same streaming connector can be implemented by spark and > flink. But I think this is impossible, as bahir depend spark-streaming > and spark sql. > 2. About connector fault tolerance. Bahir's mqtt and akka connector > are themselves' storage. But netty have no persist data feature. > I think we can append data to a ringbuffer. When SourceContext > collect() throw error, then write message to inform client to stop > send message. When flink SourceContext is normal, then write the > ringbuffer data to flink, inform client to go on. > Because pushing mode is hard to control the flow throughput, > the upstream client can. > This netty connector's purpose is end-to-end streaming, > minimum time delay. > 3. Later on, We can provide http protocol ability, put, post, get, > even websocket or jersey restful based on netty. > > -邮件原件- > 发件人: Stephan Ewen [mailto:se...@apache.org] > 发送时间: 2016年9月27日 16:54 > 收件人: dev@flink.apache.org > 主题: Re: [DISCUSS] add netty tcp/restful pushed source support > > I think that could be an interesting source. Two quick questions to > move forward > > - To keep the Flink code base from becoming too big (hard to > maintain and > test) we started working with Apache Bahir as a project dedicated to > streaming connectors. Would that be a good target for the connector? > > - What are your thoughts on fault tolerance for that connector? > > On Mon, Sep 26, 2016 at 3:01 PM, shijinkui wrote: > > > Hi, all > > > > 1.In order to support end-to-end pushed source, I create FLINK-4630< > > https://issues.apache.org/jira/browse/FLINK-4630>. I want to know > > whether is this idea worth? > > > > --- > > When source stream get start, listen a provided tcp port, receive > > stream data from user data source. > > This netty tcp source is keepping alive and end-to-end, that is from > > business system to flink worker directly. > > > > user app push -> netty server source of Flink > > > > describe the source in detail below: > > > > 1.source run as a netty tcp server > > 2.user provide a tcp port, if the port is in used, increace the port > > number between 1024 to 65535. Source can parallel. > > 3.callback the provided url to report the real port to listen > > 4.user push streaming data to netty server, then collect the data to > > flink > > > > > > Thanks > > > > Jinkui Shi > > > > >
[DISCUSS] Removing delete*Timer from the WindowOperator.Context
Hi all, As the title of this email suggests, I am proposing to remove the methods deleteProcessingTimeTimer(long time) and deleteEventTimeTimer(long time) from the WindowOperator.Context. With this change, registered timers that have nothing to do (e.g. because their state has already been cleaned up) will be simply ignored by the windowOperator, when their time comes. The reason for the change is that by allowing custom user code, e.g. a custom Trigger, to delete timers we may have unpredictable behavior. As an example, one can imagine the case where we have allowed_lateness = 0 and the cleanup timer for a window collides with the end_of_window one. In this case, by deleting the end_of_window timer from the trigger (possibly a custom one), we end up also deleting the cleanup one, which in turn can lead to the window state never being garbage collected. To see what can be the consequences apart from memory leaks, this can easily lead to wrong session windows, as a session that should have been garbage collected, will still be around and ready to accept new data. With this change, timers that should correctly be deleted will now remain in the queue of pending timers, but they will do nothing, while cleanup timers will cleanup the state of their corresponding window. Other possible solutions like keeping a separate list for cleanup timers would complicate the codebase and also introduce memory overheads which can be avoided using the solution above (i.e. just ignoring timers the have nothing to do anymore). What do you think? Kostas
[jira] [Created] (FLINK-4697) Gather more detailed checkpoint stats in CheckpointStatsTracker
Stephan Ewen created FLINK-4697: --- Summary: Gather more detailed checkpoint stats in CheckpointStatsTracker Key: FLINK-4697 URL: https://issues.apache.org/jira/browse/FLINK-4697 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Stephan Ewen Fix For: 1.2.0 The additional information attached to the {{AcknowledgeCheckpoint}} method must be gathered in the {{CheckpointStatsTracker}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4698) Visualize additional checkpoint information
Stephan Ewen created FLINK-4698: --- Summary: Visualize additional checkpoint information Key: FLINK-4698 URL: https://issues.apache.org/jira/browse/FLINK-4698 Project: Flink Issue Type: Sub-task Components: Webfrontend Reporter: Stephan Ewen Fix For: 1.2.0 Display the additional information gathered in the {{CheckpointStatsTracker}} in the "Checkpoint" tab. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4699) Convert Kafka TableSource/TableSink tests to unit tests
Timo Walther created FLINK-4699: --- Summary: Convert Kafka TableSource/TableSink tests to unit tests Key: FLINK-4699 URL: https://issues.apache.org/jira/browse/FLINK-4699 Project: Flink Issue Type: Test Components: Table API & SQL Reporter: Timo Walther The Kafka tests are extremely heavy and that the Table Sources and Sinks are only thin wrappers on top of the Kafka Sources / Sinks. That should not need to bring up Kafka clusters. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4700) Harden the TimeProvider test
Kostas Kloudas created FLINK-4700: - Summary: Harden the TimeProvider test Key: FLINK-4700 URL: https://issues.apache.org/jira/browse/FLINK-4700 Project: Flink Issue Type: Bug Components: Tests Reporter: Kostas Kloudas Assignee: Kostas Kloudas Currently the TimeProvider test fails due to a race condition. This task aims at fixing it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
flink 1.1.2 RichFunction not working
Hi, Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression in my code. In order to Isolate the issue I have written a small flink job that demonstrates that. The job does some time based window operations with an input csv file (in the example below - count the number of events on sliding window of 3600 seconds every 10 seconds) using event time instead of wall clock time, and writes results down to an output csv file. When I upload the jar via flink admin UI (local cluster) and submit the job, it returns as finished after a couple of seconds but the RichFunction is not applied (I do not see any log messages and the output csv is empty) same results when running in local mode (through the IDE). Here is sample code (fully working example) to demonstrate what I'm trying to achieve. Please note - it works on flink 1.0.2 but doesn't work on flink 1.1.2. No error thrown. The output CSV file is simply empty. *Job class:* import com.firelayers.spike.functions.sink.FileSinkFunction import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import org.slf4j.{Logger, LoggerFactory} import play.api.libs.json.Json import scala.language.postfixOps object JobTest { implicit val Logger: Logger = LoggerFactory.getLogger(JobTest.getClass) var parameterTool: ParameterTool = null var env: StreamExecutionEnvironment = null def main(args: Array[String]): Unit = { implicit val ev = TypeInformation.of(classOf[MyEvent]) parameterTool = ParameterTool.fromArgs(args) val input_path = parameterTool.getRequired("in.path") val out_path = parameterTool.getRequired("out.path") env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val events_stream = env.readTextFile(s"file://$input_path").map(new ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp) implicit val sti = TypeInformation.of(classOf[String]) events_stream.keyBy(_.guid) .timeWindow(Time.seconds(3600), Time.seconds(10)) .apply[String](new TFunction[MyEvent]).name("test") .addSink(new FileSinkFunction[String](out_path)) env.execute("test") } } class TFunction[T] extends WindowFunction[T, String, String, TimeWindow] { val logger: Logger = LoggerFactory.getLogger(this.getClass) override def apply( cell: String, window: TimeWindow, events: Iterable[T], out: Collector[String]): Unit = { val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]] val data = s"${events_as_seq.head.guid}" logger.info(events_as_seq.size.toString) if(events_as_seq.size > 2) out.collect(s"${events_as_seq.size.toString} $data") } } case class MyEvent(guid: String, request_date_time_epoch: String) {} object MyEvent { implicit val jsonWriter = Json.writes[MyEvent] implicit val jsonReader = Json.reads[MyEvent] def parseFromString(record: String): MyEvent = { val data = record.split(",") new MyEvent(data.apply(1), data.apply(0)) } } class MyTimestamp extends AssignerWithPeriodicWatermarks[MyEvent] { private var currentTimestamp = Long.MinValue override def extractTimestamp(element: MyEvent, previous_timestamp: Long): Long = { this.currentTimestamp = element.request_date_time_epoch.toLong this.currentTimestamp } override def getCurrentWatermark: Watermark = { val watermark_value = if(currentTimestamp == Long.MinValue) Long.MinValue else currentTimestamp - 1 new Watermark(watermark_value) } } class ParseMyEventFunction extends RichMapFunction[String, MyEvent] { override def map(record: String): MyEvent = { MyEvent.parseFromString(record) } } To run - use this parameters: --in.path /in.csv --out.path /out.csv This is a sample input csv file (in.csv): 1446651603000,ae741eb9efa47d14a2b43c07b0286306 1446651617000,ae741eb9efa47d14a2b43c07b0286306 1446651762000,321bfb334d878bd321d2fd38eebfbb3d 1446651846000,ae741eb9efa47d14a2b43c07b0286306 1446651861000,321bfb334d878bd321d2fd38eebfbb3d 1446651885000,321bfb334d878bd321d2fd38eebfbb3d 1446652058000,873cd5958ec5e8b6f555ee95abc6abb4 1446652303000,dc668e71e7cfe2f
[jira] [Created] (FLINK-4701) Unprotected access to cancelables in StreamTask
Ted Yu created FLINK-4701: - Summary: Unprotected access to cancelables in StreamTask Key: FLINK-4701 URL: https://issues.apache.org/jira/browse/FLINK-4701 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor In performCheckpoint(): {code} AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( "checkpoint-" + checkpointId + "-" + timestamp, this, cancelables, chainedStateHandles, keyGroupsStateHandleFuture, checkpointId, bytesBufferedAlignment, alignmentDurationNanos, syncDurationMillis, endOfSyncPart); synchronized (cancelables) { cancelables.add(asyncCheckpointRunnable); } {code} Construction of AsyncCheckpointRunnable should be put under the synchronized block of cancelables. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: flink 1.1.2 RichFunction not working
Hi Chen, Please upload your Flink scala library dependencies. Regards Sunny. On Tue, Sep 27, 2016 at 5:56 PM, Chen Bekor wrote: > Hi, > > Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression in > my code. In order to Isolate the issue I have written a small flink job > that demonstrates that. > > The job does some time based window operations with an input csv file (in > the example below - count the number of events on sliding window of 3600 > seconds every 10 seconds) using event time instead of wall clock time, and > writes results down to an output csv file. > > When I upload the jar via flink admin UI (local cluster) and submit the > job, it returns as finished after a couple of seconds but the RichFunction > is not applied (I do not see any log messages and the output csv is empty) > > same results when running in local mode (through the IDE). > > Here is sample code (fully working example) to demonstrate what I'm trying > to achieve. Please note - it works on flink 1.0.2 but doesn't work on flink > 1.1.2. No error thrown. The output CSV file is simply empty. > > > *Job class:* > > import com.firelayers.spike.functions.sink.FileSinkFunction > import org.apache.flink.api.common.functions.RichMapFunction > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.streaming.api.functions. > AssignerWithPeriodicWatermarks > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.scala.function.WindowFunction > import org.apache.flink.streaming.api.watermark.Watermark > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > import org.apache.flink.util.Collector > import org.slf4j.{Logger, LoggerFactory} > import play.api.libs.json.Json > > import scala.language.postfixOps > > object JobTest { > > implicit val Logger: Logger = LoggerFactory.getLogger( > JobTest.getClass) > > var parameterTool: ParameterTool = null > var env: StreamExecutionEnvironment = null > > def main(args: Array[String]): Unit = { > implicit val ev = TypeInformation.of(classOf[MyEvent]) > parameterTool = ParameterTool.fromArgs(args) > val input_path = parameterTool.getRequired("in.path") > val out_path = parameterTool.getRequired("out.path") > env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val events_stream = env.readTextFile(s"file://$input_path").map(new > ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp) > implicit val sti = TypeInformation.of(classOf[String]) > events_stream.keyBy(_.guid) > .timeWindow(Time.seconds(3600), Time.seconds(10)) > .apply[String](new TFunction[MyEvent]).name("test") > .addSink(new FileSinkFunction[String](out_path)) > env.execute("test") > } > } > > class TFunction[T] extends WindowFunction[T, String, String, TimeWindow] { > val logger: Logger = LoggerFactory.getLogger(this.getClass) > > override def apply( > cell: String, > window: TimeWindow, > events: Iterable[T], > out: Collector[String]): Unit = { > > val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]] > val data = s"${events_as_seq.head.guid}" > logger.info(events_as_seq.size.toString) > if(events_as_seq.size > 2) > out.collect(s"${events_as_seq.size.toString} $data") > } > } > > case class MyEvent(guid: String, request_date_time_epoch: String) {} > > object MyEvent { > > implicit val jsonWriter = Json.writes[MyEvent] > implicit val jsonReader = Json.reads[MyEvent] > > def parseFromString(record: String): MyEvent = { > val data = record.split(",") > new MyEvent(data.apply(1), data.apply(0)) > } > } > > class MyTimestamp extends AssignerWithPeriodicWatermarks[MyEvent] { > private var currentTimestamp = Long.MinValue > > override def extractTimestamp(element: MyEvent, previous_timestamp: > Long): Long = { > this.currentTimestamp = element.request_date_time_epoch.toLong > this.currentTimestamp > } > > override def getCurrentWatermark: Watermark = { > val watermark_value = if(currentTimestamp == Long.MinValue) > Long.MinValue else currentTimestamp - 1 > new Watermark(watermark_value) > } > } > > class ParseMyEventFunction extends RichMapFunction[String, MyEvent] { > override def map(record: String): MyEvent = { > MyEvent.parseFromString(record) > } > } > > > To run - use this parameters: --in.path /in.csv --out.path /out.csv > > This is a sample input csv file (in.csv): > >
Re: flink 1.1.2 RichFunction not working
Sorry for the inconvenience. This is a known issue and being fixed for Flink 1.1.3 - the problem is that the streaming File sources were reworked to continuously monitor the File System, but the watermarks are not handled correctly. https://issues.apache.org/jira/browse/FLINK-4329 So far, 2/3 parts of the fix are in, the last part is an open pull request. Once that is in, we can look into getting Flink 1.1.3 out. Best, Stephan On Tue, Sep 27, 2016 at 8:17 PM, sunny patel wrote: > Hi Chen, > > Please upload your Flink scala library dependencies. > > Regards > Sunny. > > On Tue, Sep 27, 2016 at 5:56 PM, Chen Bekor wrote: > > > Hi, > > > > Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression > in > > my code. In order to Isolate the issue I have written a small flink job > > that demonstrates that. > > > > The job does some time based window operations with an input csv file (in > > the example below - count the number of events on sliding window of 3600 > > seconds every 10 seconds) using event time instead of wall clock time, > and > > writes results down to an output csv file. > > > > When I upload the jar via flink admin UI (local cluster) and submit the > > job, it returns as finished after a couple of seconds but the > RichFunction > > is not applied (I do not see any log messages and the output csv is > empty) > > > > same results when running in local mode (through the IDE). > > > > Here is sample code (fully working example) to demonstrate what I'm > trying > > to achieve. Please note - it works on flink 1.0.2 but doesn't work on > flink > > 1.1.2. No error thrown. The output CSV file is simply empty. > > > > > > *Job class:* > > > > import com.firelayers.spike.functions.sink.FileSinkFunction > > import org.apache.flink.api.common.functions.RichMapFunction > > import org.apache.flink.api.common.typeinfo.TypeInformation > > import org.apache.flink.api.java.utils.ParameterTool > > import org.apache.flink.streaming.api.TimeCharacteristic > > import org.apache.flink.streaming.api.functions. > > AssignerWithPeriodicWatermarks > > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > > import org.apache.flink.streaming.api.scala.function.WindowFunction > > import org.apache.flink.streaming.api.watermark.Watermark > > import org.apache.flink.streaming.api.windowing.time.Time > > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > > import org.apache.flink.util.Collector > > import org.slf4j.{Logger, LoggerFactory} > > import play.api.libs.json.Json > > > > import scala.language.postfixOps > > > > object JobTest { > > > > implicit val Logger: Logger = LoggerFactory.getLogger( > > JobTest.getClass) > > > > var parameterTool: ParameterTool = null > > var env: StreamExecutionEnvironment = null > > > > def main(args: Array[String]): Unit = { > > implicit val ev = TypeInformation.of(classOf[MyEvent]) > > parameterTool = ParameterTool.fromArgs(args) > > val input_path = parameterTool.getRequired("in.path") > > val out_path = parameterTool.getRequired("out.path") > > env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val events_stream = env.readTextFile(s"file://$ > input_path").map(new > > ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp) > > implicit val sti = TypeInformation.of(classOf[String]) > > events_stream.keyBy(_.guid) > > .timeWindow(Time.seconds(3600), Time.seconds(10)) > > .apply[String](new TFunction[MyEvent]).name("test") > > .addSink(new FileSinkFunction[String](out_path)) > > env.execute("test") > > } > > } > > > > class TFunction[T] extends WindowFunction[T, String, String, TimeWindow] > { > > val logger: Logger = LoggerFactory.getLogger(this.getClass) > > > > override def apply( > > cell: String, > > window: TimeWindow, > > events: Iterable[T], > > out: Collector[String]): Unit = { > > > > val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]] > > val data = s"${events_as_seq.head.guid}" > > logger.info(events_as_seq.size.toString) > > if(events_as_seq.size > 2) > > out.collect(s"${events_as_seq.size.toString} $data") > > } > > } > > > > case class MyEvent(guid: String, request_date_time_epoch: String) {} > > > > object MyEvent { > > > > implicit val jsonWriter = Json.writes[MyEvent] > > implicit val jsonReader = Json.reads[MyEvent] > > > > def parseFromString(record: String): MyEvent = { > > val data = record.split(",") > > new MyEvent(data.apply(1), data.apply(0)) > > } > > } > > > > class MyTimestamp extends AssignerWithPeriodicWatermarks[MyEvent] { > > private var currentTimestamp = Long.MinValue > > > >
[jira] [Created] (FLINK-4702) Kafka consumer must commit offsets asynchronously
Stephan Ewen created FLINK-4702: --- Summary: Kafka consumer must commit offsets asynchronously Key: FLINK-4702 URL: https://issues.apache.org/jira/browse/FLINK-4702 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.1.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Blocker Fix For: 1.2.0, 1.1.3 The offset commit calls to Kafka may occasionally take very long. In that case, the {{notifyCheckpointComplete()}} method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints. Kafka 0.9+ have methods to commit asynchronously. We should use those and make sure no more than one commit is concurrently in progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: flink 1.1.2 RichFunction not working
thanks. worth mentioning in the release notes of 1.1.2 that file source is broken. we spent a substantial time on trying to figure out what's the root cause. On Sep 27, 2016 9:40 PM, "Stephan Ewen" wrote: > Sorry for the inconvenience. This is a known issue and being fixed for > Flink 1.1.3 - the problem is that the streaming File sources were reworked > to continuously monitor the File System, but the watermarks are not handled > correctly. > > https://issues.apache.org/jira/browse/FLINK-4329 > > So far, 2/3 parts of the fix are in, the last part is an open pull request. > Once that is in, we can look into getting Flink 1.1.3 out. > > Best, > Stephan > > > > On Tue, Sep 27, 2016 at 8:17 PM, sunny patel wrote: > >> Hi Chen, >> >> Please upload your Flink scala library dependencies. >> >> Regards >> Sunny. >> >> On Tue, Sep 27, 2016 at 5:56 PM, Chen Bekor wrote: >> >> > Hi, >> > >> > Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing regression >> in >> > my code. In order to Isolate the issue I have written a small flink job >> > that demonstrates that. >> > >> > The job does some time based window operations with an input csv file >> (in >> > the example below - count the number of events on sliding window of 3600 >> > seconds every 10 seconds) using event time instead of wall clock time, >> and >> > writes results down to an output csv file. >> > >> > When I upload the jar via flink admin UI (local cluster) and submit the >> > job, it returns as finished after a couple of seconds but the >> RichFunction >> > is not applied (I do not see any log messages and the output csv is >> empty) >> > >> > same results when running in local mode (through the IDE). >> > >> > Here is sample code (fully working example) to demonstrate what I'm >> trying >> > to achieve. Please note - it works on flink 1.0.2 but doesn't work on >> flink >> > 1.1.2. No error thrown. The output CSV file is simply empty. >> > >> > >> > *Job class:* >> > >> > import com.firelayers.spike.functions.sink.FileSinkFunction >> > import org.apache.flink.api.common.functions.RichMapFunction >> > import org.apache.flink.api.common.typeinfo.TypeInformation >> > import org.apache.flink.api.java.utils.ParameterTool >> > import org.apache.flink.streaming.api.TimeCharacteristic >> > import org.apache.flink.streaming.api.functions. >> > AssignerWithPeriodicWatermarks >> > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> > import org.apache.flink.streaming.api.scala.function.WindowFunction >> > import org.apache.flink.streaming.api.watermark.Watermark >> > import org.apache.flink.streaming.api.windowing.time.Time >> > import org.apache.flink.streaming.api.windowing.windows.TimeWindow >> > import org.apache.flink.util.Collector >> > import org.slf4j.{Logger, LoggerFactory} >> > import play.api.libs.json.Json >> > >> > import scala.language.postfixOps >> > >> > object JobTest { >> > >> > implicit val Logger: Logger = LoggerFactory.getLogger( >> > JobTest.getClass) >> > >> > var parameterTool: ParameterTool = null >> > var env: StreamExecutionEnvironment = null >> > >> > def main(args: Array[String]): Unit = { >> > implicit val ev = TypeInformation.of(classOf[MyEvent]) >> > parameterTool = ParameterTool.fromArgs(args) >> > val input_path = parameterTool.getRequired("in.path") >> > val out_path = parameterTool.getRequired("out.path") >> > env = StreamExecutionEnvironment.getExecutionEnvironment >> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> > val events_stream = env.readTextFile(s"file://$inp >> ut_path").map(new >> > ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp) >> > implicit val sti = TypeInformation.of(classOf[String]) >> > events_stream.keyBy(_.guid) >> > .timeWindow(Time.seconds(3600), Time.seconds(10)) >> > .apply[String](new TFunction[MyEvent]).name("test") >> > .addSink(new FileSinkFunction[String](out_path)) >> > env.execute("test") >> > } >> > } >> > >> > class TFunction[T] extends WindowFunction[T, String, String, >> TimeWindow] { >> > val logger: Logger = LoggerFactory.getLogger(this.getClass) >> > >> > override def apply( >> > cell: String, >> > window: TimeWindow, >> > events: Iterable[T], >> > out: Collector[String]): Unit = { >> > >> > val events_as_seq = events.toSeq.asInstanceOf[Seq[MyEvent]] >> > val data = s"${events_as_seq.head.guid}" >> > logger.info(events_as_seq.size.toString) >> > if(events_as_seq.size > 2) >> > out.collect(s"${events_as_seq.size.toString} $data") >> > } >> > } >> > >> > case class MyEvent(guid: String, request_date_time_epoch: String) {} >> > >> > object MyEvent { >> > >> > implicit val jsonWriter = Json.writes[MyEvent] >> > implicit
scala api createLocalEnvironment() function add default Configuration parameter
Hi,all scala program can't direct use createLocalEnvironment with custom Configure object. such as I want to start web server in local mode with Flink UI, I will do such as: ``` // set up execution environment val conf = new Configuration conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT) val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment( org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2, conf) ) ``` For using conveniently and perfect API, we need createLocalEnvironment function have a config parameter detail in FLINK-4669. -Jinkui Shi
Re: Exception from in-progress implementation of Python API bulk iterations
Hello Chesnay, Thank you for your help. After receiving your message I recompiled my version of Flink completely, and both the NullPointerException listed in the TODO and the ClassCastException with the join operation went away. Previously, I had been only recompiling the modules of Flink that had been changed to save time using "mvn clean install -pl :module" and apparently that may have been causing some of my issues. Now, the problem is more clear: when a specific group reduce function in my research project plan file is used within an iteration, I get a ClassCastException exception: Caused by: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) at org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) at org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) at org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) at java.lang.Thread.run(Thread.java:745) I'm not sure why this is causing an exception, and I would greatly appreciate any assistance. I've revised the barebones error-causing plan file to focus on this new error source: https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a The group reduce function in question seems to work just fine outside of iterations. I have organized the commits and pushed to a new branch to make it easier to test and hopefully review soon: https://github.com/GEOFBOT/flink/tree/new-iterations Cheers, Geoffrey On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler wrote: > Hello Geoffrey, > > i could not reproduce this issue with the commits and plan you provided. > > I tried out both the FLINK-4098 and bulk-iterations branches (and > reverted back to the specified commits) and built Flink from scratch. > > Could you double check that the code you provided produces the error? > Also, which OS/python version are you using? > > Regards, > Chesnay > > On 20.09.2016 11:13, Chesnay Schepler wrote: > > Hello, > > > > I'll try to take a look this week. > > > > Regards, > > Chesnay > > > > On 20.09.2016 02:38, Geoffrey Mon wrote: > >> Hello all, > >> > >> I have recently been working on adding bulk iterations to the Python > >> API of > >> Flink in order to facilitate a research project I am working on. The > >> current changes can be seen in this GitHub diff: > >> > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > >> > >> > >> This implementation seems to work for, at least, simple examples, > >> such as > >> incrementing numbers in a data set. However, with the transformations > >> required for my project, I get an exception > >> "java.lang.ClassCastException: > >> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown > >> from the > >> deserializers called by > >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > >> I've created the following simplified Python plan by stripping down my > >> research project code to the problem-causing parts: > >> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >> > >> I have been working on this issue but I don't have any ideas on what > >> might > >> be the problem. Perhaps someone more knowledgeable about the interior of > >> the Python API could kindly help? > >> > >> Thank you very much. > >> > >> Geoffrey Mon > >> > > > > > >