Hello Piotrek!
Thanks for answering! Yes I have already changed the
"TimeCharacteristic" to "ProcessingTime". I need it for the
".setWriteTimestampToKafka(true)" option as I use the timestamp in the
Kafka consumer who reads this app's output. I have already changed the
code a bit for using KeyedStreams and be able to use parallelism in the
window/reduce functions.
About the problem, yesterday I noticed that the problem was growing as I
did more submits, it was doing x3 outputs (with small differences in
each input as you can see in my first message), but before it was doing
x2 only. Finally I stopped the cluster (stop-cluster.sh) and started it
again (start-cluster.sh) and the problem was solved. I have been trying
to repeat the problem submitting the app several times but I haven't
achieved it today. If it happens again I will try to repeat the problem
with the smaller code as possible to try to find where could be the
possible bug (it seems to be something wrong when submitting several
times).
Kind regards!
Fran.
El 2017-06-19 14:43, Piotr Nowojski escribió:
Hi,
It is difficult for me to respond fully to your question. First of all
it would be really useful if you could strip down your example to a
minimal version that shows a problem. Unfortunately I was unable to
reproduce your issue. I was getting only one output line per window
(as expected). Could you try to print output to the console (or use
some different data sink) instead of writing it back to the Kafka,
maybe there is a problem? Also please try remove some parts of the
code bit by bit, so that you may be able to find what’s causing a
problem.
As a side note I have couple of concerns with your
timestamps/watermarks/windows definitions. First you specify time
characteristic to an EventTime:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
But I don’t see where you are actually setting the
timestamp/watermarks. Didn’t you want to use
“.assignTimestampsAndWatermarks(…)” on your input DataStream
based on it’s content? Nevertheless, later you specify window by
ProcessingTime:
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
Which defines the windows independent of the content of those events.
Maybe switching to properly EvenTime will solve your problem?
Thanks, Piotrek