I did confirm that I got no resulting output after 20 seconds and after
sending additional data after waiting over a minute between batches of
data.
My code looks like this:
PulsarSourceBuilder<String> builder = PulsarSourceBuilder
.builder(new SimpleStringSchema())
.serviceUrl(SERVICE_URL)
.topic(INPUT_TOPIC)
.subscriptionName(SUBSCRIPTION_NAME);
SourceFunction<String> src = builder.build();
DataStream<String> dataStream = env.addSource(src);
DataStream<String> combinedEnvelopes = dataStream
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
//.timeWindow(Time.seconds(5))
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.aggregate(new JsonConcatenator());
//dataStream.print();
Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
logger.info("Ran dataStream. Adding sink next");
combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
SERVICE_URL,
OUTPUT_TOPIC,
new AuthenticationDisabled(), // probably need to fix //
AuthenticationTls()
combinedData -> combinedData.toString().getBytes(UTF_8),
combinedData -> "test")
);
logger.info("Added sink. Executing job.");
// execute program
env.execute("Flink Streaming Java API Skeleton");
Here is the JsonConcatenator class:
private static class JsonConcatenator
implements AggregateFunction<Tuple2<String, String>,
Tuple2<String, String>, String> {
Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
@Override
public Tuple2<String, String> createAccumulator() {
return new Tuple2<String, String>("","");
}
@Override
public Tuple2<String, String> add(Tuple2<String, String> value,
Tuple2<String, String> accumulator) {
logger.info("Running Add on value.f0: " + value.f0 + " and
value.f1: " + value.f1);
return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
}
@Override
public String getResult(Tuple2<String, String> accumulator) {
logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
return "[" + accumulator.f1.substring(1) + "]";
}
@Override
public Tuple2<String, String> merge(Tuple2<String, String> a,
Tuple2<String, String> b) {
// Merge is applied when you allow lateness.
logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
a.f1 + " and b.f1: " + b.f1);
if(b.f1.charAt(0) == '['){
logger.info("During merge, we detected the right message
starts with the '[' character. Removing it.");
b.f1 = b.f1.substring(1);
}
return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
}
}
Devin G. Bost
Re:
getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
> Best,
> Arvid
On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise <[email protected]> wrote:
> getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
>
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
>
> Best,
>
> Arvid
>
> On Mon, Dec 9, 2019 at 8:51 AM vino yang <[email protected]> wrote:
>
>> Hi dev,
>>
>> The time of the window may have different semantics.
>> In the session window, it's only a time gap, the size of the window is
>> driven via activity events.
>> In the tumbling or sliding window, it means the size of the window.
>>
>> For more details, please see the official documentation.[1]
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows
>>
>>
>>
>> devinbost <[email protected]> 于2019年12月6日周五 下午10:39写道:
>>
>>> I think there might be a bug in
>>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>>> (unless I'm just not using it correctly) because I'm able to get output
>>> when I use the simpler window
>>> `.timeWindow(Time.seconds(5))`
>>> However, I don't get any output when I used the session-based window.
>>>
>>>
>>> devinbost wrote
>>> > I added logging statements everywhere in my code, and I'm able to see
>>> my
>>> > message reach the `add` method in the AggregateFunction that I
>>> > implemented,
>>> > but the getResult method is never called.
>>> >
>>> > In the code below, I also never see the:
>>> > "Ran dataStream. Adding sink next"
>>> > line appear in my log, and the only log statements from the
>>> > JsonConcatenator
>>> > class come from the `add` method, as shown below.
>>> >
>>> >
>>> > DataStream
>>> > <String>
>>> > combinedEnvelopes = dataStream
>>> > .map(new MapFunction<String, Tuple2&lt;String,
>>> String>>() {
>>> > @Override
>>> > public Tuple2 map(String incomingMessage) throws Exception {
>>> > return mapToTuple(incomingMessage);
>>> > }
>>> > })
>>> > .keyBy(0)
>>> > .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>> > .aggregate(new JsonConcatenator());
>>> >
>>> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
>>> > logger.info("Ran dataStream. Adding sink next")
>>> >
>>> > -------------
>>> >
>>> > private static class JsonConcatenator
>>> > implements AggregateFunction<Tuple2&lt;String,
>>> String>,
>>> > Tuple2<String, String>, String> {
>>> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>>> > @Override
>>> > public Tuple2<String, String> createAccumulator() {
>>> > return new Tuple2<String, String>("","");
>>> > }
>>> >
>>> > @Override
>>> > public Tuple2<String, String> add(Tuple2<String,
>>> String>
>>> > value,
>>> > Tuple2<String, String> accumulator) {
>>> > logger.info("Running Add on value.f0: " + value.f0 + " and
>>> > value.f1:
>>> > " + value.f1);
>>> > return new Tuple2<>(value.f0, accumulator.f1 + ", " +
>>> value.f1);
>>> > }
>>> >
>>> > @Override
>>> > public String getResult(Tuple2<String, String> accumulator) {
>>> > logger.info("Running getResult on accumulator.f1: " +
>>> > accumulator.f1);
>>> > return "[" + accumulator.f1 + "]";
>>> > }
>>> >
>>> > @Override
>>> > public Tuple2<String, String> merge(Tuple2<String,
>>> String>
>>> > a,
>>> > Tuple2<String, String> b) {
>>> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: "
>>> +
>>> > a.f1
>>> > + " and b.f1: " + b.f1);
>>> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>>> > }
>>> > }
>>> >
>>> >
>>> >
>>> >
>>> > Any ideas?
>>> >
>>> >
>>> > Chris Miller-2 wrote
>>> >> I hit the same problem, as far as I can tell it should be fixed in
>>> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
>>> it
>>> >> should be available in a day or two.
>>> >>
>>> >> https://github.com/apache/pulsar/pull/5068
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > Sent from:
>>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>