I did confirm that I got no resulting output after 20 seconds and after
sending additional data after waiting over a minute between batches of
data.

My code looks like this:

PulsarSourceBuilder<String> builder = PulsarSourceBuilder
      .builder(new SimpleStringSchema())
      .serviceUrl(SERVICE_URL)
      .topic(INPUT_TOPIC)
      .subscriptionName(SUBSCRIPTION_NAME);
SourceFunction<String> src = builder.build();
DataStream<String> dataStream = env.addSource(src);

DataStream<String> combinedEnvelopes = dataStream
      .map(new MapFunction<String, Tuple2<String, String>>() {
         @Override
         public Tuple2 map(String incomingMessage) throws Exception {
            return mapToTuple(incomingMessage);
         }
      })
      .keyBy(0)
      //.timeWindow(Time.seconds(5))
      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
      .aggregate(new JsonConcatenator());
//dataStream.print();

Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
logger.info("Ran dataStream. Adding sink next");
combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
      SERVICE_URL,
      OUTPUT_TOPIC,
      new AuthenticationDisabled(), // probably need to fix //
AuthenticationTls()
      combinedData -> combinedData.toString().getBytes(UTF_8),
      combinedData -> "test")
);
logger.info("Added sink. Executing job.");
// execute program
env.execute("Flink Streaming Java API Skeleton");


Here is the JsonConcatenator class:

private static class JsonConcatenator
      implements AggregateFunction<Tuple2<String, String>,
Tuple2<String, String>, String> {
   Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
   @Override
   public Tuple2<String, String> createAccumulator() {
      return new Tuple2<String, String>("","");
   }

   @Override
   public Tuple2<String, String> add(Tuple2<String, String> value,
Tuple2<String, String> accumulator) {
      logger.info("Running Add on value.f0: " + value.f0 + " and
value.f1: " + value.f1);
      return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
   }

   @Override
   public String getResult(Tuple2<String, String> accumulator) {
      logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
      return "[" + accumulator.f1.substring(1) + "]";
   }

   @Override
   public Tuple2<String, String> merge(Tuple2<String, String> a,
Tuple2<String, String> b) {
      // Merge is applied when you allow lateness.
      logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
a.f1 + " and b.f1: " + b.f1);
      if(b.f1.charAt(0) == '['){
         logger.info("During merge, we detected the right message
starts with the '[' character. Removing it.");
         b.f1 = b.f1.substring(1);
      }
      return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
   }
}


Devin G. Bost

Re:

getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
> Best,
> Arvid


On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise <ar...@ververica.com> wrote:

> getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
>
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
>
> Best,
>
> Arvid
>
> On Mon, Dec 9, 2019 at 8:51 AM vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi dev,
>>
>> The time of the window may have different semantics.
>> In the session window, it's only a time gap, the size of the window is
>> driven via activity events.
>> In the tumbling or sliding window, it means the size of the window.
>>
>> For more details, please see the official documentation.[1]
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows
>>
>>
>>
>> devinbost <devin.b...@gmail.com> 于2019年12月6日周五 下午10:39写道:
>>
>>> I think there might be a bug in
>>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>>>  (unless I'm just not using it correctly) because I'm able to get output
>>> when I use the simpler window
>>> `.timeWindow(Time.seconds(5))`
>>> However, I don't get any output when I used the session-based window.
>>>
>>>
>>> devinbost wrote
>>> > I added logging statements everywhere in my code, and I'm able to see
>>> my
>>> > message reach the `add` method in the AggregateFunction that I
>>> > implemented,
>>> > but the getResult method is never called.
>>> >
>>> > In the code below, I also never see the:
>>> >  "Ran dataStream. Adding sink next"
>>> > line appear in my log, and the only log statements from the
>>> > JsonConcatenator
>>> > class come from the `add` method, as shown below.
>>> >
>>> >
>>> > DataStream
>>> > <String>
>>> >  combinedEnvelopes = dataStream
>>> >     .map(new MapFunction&lt;String, Tuple2&amp;lt;String,
>>> String&gt;>() {
>>> >         @Override
>>> >         public Tuple2 map(String incomingMessage) throws Exception {
>>> >             return mapToTuple(incomingMessage);
>>> >         }
>>> >     })
>>> >     .keyBy(0)
>>> >     .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>> >     .aggregate(new JsonConcatenator());
>>> >
>>> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
>>> > logger.info("Ran dataStream. Adding sink next")
>>> >
>>> > -------------
>>> >
>>> > private static class JsonConcatenator
>>> >         implements AggregateFunction&lt;Tuple2&amp;lt;String,
>>> String&gt;,
>>> > Tuple2&lt;String, String&gt;, String> {
>>> >     Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>>> >     @Override
>>> >     public Tuple2&lt;String, String&gt; createAccumulator() {
>>> >         return new Tuple2&lt;String, String&gt;("","");
>>> >     }
>>> >
>>> >     @Override
>>> >     public Tuple2&lt;String, String&gt; add(Tuple2&lt;String,
>>> String&gt;
>>> > value,
>>> > Tuple2&lt;String, String&gt; accumulator) {
>>> >         logger.info("Running Add on value.f0: " + value.f0 + " and
>>> > value.f1:
>>> > " + value.f1);
>>> >         return new Tuple2<>(value.f0, accumulator.f1 + ", " +
>>> value.f1);
>>> >     }
>>> >
>>> >     @Override
>>> >     public String getResult(Tuple2&lt;String, String&gt; accumulator) {
>>> >         logger.info("Running getResult on accumulator.f1: " +
>>> > accumulator.f1);
>>> >         return "[" + accumulator.f1 + "]";
>>> >     }
>>> >
>>> >     @Override
>>> >     public Tuple2&lt;String, String&gt; merge(Tuple2&lt;String,
>>> String&gt;
>>> > a,
>>> > Tuple2&lt;String, String&gt; b) {
>>> >         logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: "
>>> +
>>> > a.f1
>>> > + " and b.f1: " + b.f1);
>>> >         return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>>> >     }
>>> > }
>>> >
>>> >
>>> >
>>> >
>>> > Any ideas?
>>> >
>>> >
>>> > Chris Miller-2 wrote
>>> >> I hit the same problem, as far as I can tell it should be fixed in
>>> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
>>> it
>>> >> should be available in a day or two.
>>> >>
>>> >> https://github.com/apache/pulsar/pull/5068
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > Sent from:
>>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Reply via email to