I think there might be a bug in
`.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
 (unless I'm just not using it correctly) because I'm able to get output
when I use the simpler window 
`.timeWindow(Time.seconds(5))`
However, I don't get any output when I used the session-based window. 


devinbost wrote
> I added logging statements everywhere in my code, and I'm able to see my
> message reach the `add` method in the AggregateFunction that I
> implemented,
> but the getResult method is never called. 
> 
> In the code below, I also never see the:
>  "Ran dataStream. Adding sink next"
> line appear in my log, and the only log statements from the
> JsonConcatenator
> class come from the `add` method, as shown below. 
> 
> 
> DataStream
> <String>
>  combinedEnvelopes = dataStream
>     .map(new MapFunction&lt;String, Tuple2&amp;lt;String, String&gt;>() {
>         @Override
>         public Tuple2 map(String incomingMessage) throws Exception {
>             return mapToTuple(incomingMessage);
>         }
>     })
>     .keyBy(0)
>     .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>     .aggregate(new JsonConcatenator());
> 
> Logger logger = LoggerFactory.getLogger(StreamJob.class);
> logger.info("Ran dataStream. Adding sink next")
> 
> -------------
> 
> private static class JsonConcatenator
>         implements AggregateFunction&lt;Tuple2&amp;lt;String, String&gt;,
> Tuple2&lt;String, String&gt;, String> {
>     Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>     @Override
>     public Tuple2&lt;String, String&gt; createAccumulator() {
>         return new Tuple2&lt;String, String&gt;("","");
>     }
> 
>     @Override
>     public Tuple2&lt;String, String&gt; add(Tuple2&lt;String, String&gt;
> value,
> Tuple2&lt;String, String&gt; accumulator) {
>         logger.info("Running Add on value.f0: " + value.f0 + " and
> value.f1:
> " + value.f1);
>         return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>     }
> 
>     @Override
>     public String getResult(Tuple2&lt;String, String&gt; accumulator) {
>         logger.info("Running getResult on accumulator.f1: " +
> accumulator.f1);
>         return "[" + accumulator.f1 + "]";
>     }
> 
>     @Override
>     public Tuple2&lt;String, String&gt; merge(Tuple2&lt;String, String&gt;
> a,
> Tuple2&lt;String, String&gt; b) {
>         logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> a.f1
> + " and b.f1: " + b.f1);
>         return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>     }
> }
> 
> 
> 
> 
> Any ideas? 
> 
> 
> Chris Miller-2 wrote
>> I hit the same problem, as far as I can tell it should be fixed in 
>> Pulsar 2.4.2. The release of this has already passed voting so I hope it 
>> should be available in a day or two.
>> 
>> https://github.com/apache/pulsar/pull/5068
> 
> 
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to