Re: periodic trigger

2017-12-22 Thread Plamen Paskov
conds user windows (emitted every 10 seconds from the first aggregation). Piotrek On 21 Dec 2017, at 15:18, Plamen Paskov <mailto:plamen.pas...@next-stream.com>> wrote: Imagine a case where i want to run a computation every X seconds for 1 day window. I want the calculate average sess

Re: state.checkpoints.dir not configured

2017-12-21 Thread Plamen Paskov
annot bind to the port it should simply die and not complain about checkpoint configuration. – Ufuk On Thu, Dec 21, 2017 at 1:21 PM, Plamen Paskov wrote: I inspected the log as you suggest and found that 6123 port was used by another process. I free the port and restarted the job manager

Re: periodic trigger

2017-12-21 Thread Plamen Paskov
either tumbling window with default trigger (triggering at the end of the window), or use session windows. Piotrek On 21 Dec 2017, at 13:29, Plamen Paskov <mailto:plamen.pas...@next-stream.com>> wrote: Hi guys, I have the following code: SingleOutputStreamOperator lastUserSess

periodic trigger

2017-12-21 Thread Plamen Paskov
Hi guys, I have the following code: SingleOutputStreamOperator lastUserSession = env .socketTextStream("localhost",9000,"\n") .map(new MapFunction() { @Override public Event map(String value)throws Exception { String[] row = value.split(",");

Re: state.checkpoints.dir not configured

2017-12-21 Thread Plamen Paskov
ue, Dec 19, 2017 at 3:45 PM, Plamen Paskov wrote: Hi, I'm trying to enable externalized checkpoints like this: env.enableCheckpointing(1000); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternal

state.checkpoints.dir not configured

2017-12-19 Thread Plamen Paskov
Hi, I'm trying to enable externalized checkpoints like this: env.enableCheckpointing(1000); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setChe

Re: consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
regate the length per user and emit it downstream. Then you do the all window and average all lengths. Does that make sense? On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov wrote: I think i got your point. What happens now: in order to use aggregate() i need an window but the window requires keyBy() if

Re: consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov wrote: Hi, I'm trying to calculate the running average of session length and i want to trigger the computati

Re: consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
/functions/AggregateFunction.java On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov wrote: Hi, I'm trying to calculate the running average of session length and i want to trigger the computation on a regular let's say 2 minutes interval. I'm trying to do it like this: package flink; import lombok.A

consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
Hi, I'm trying to calculate the running average of session length and i want to trigger the computation on a regular let's say 2 minutes interval. I'm trying to do it like this: package flink; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.ToString; import o

Re: streamin Table API - strange behavior

2017-12-14 Thread Plamen Paskov
ogress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled). Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L); Best, Fabian 2017-12-14 16:30 GMT+01:00 Plamen Paskov m

streamin Table API - strange behavior

2017-12-14 Thread Plamen Paskov
Hi, I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new