No problem. Make sure that your application didn’t run in the background three times, thus producing 3x the expected output.
Piotrek > On Jun 19, 2017, at 5:25 PM, FRANCISCO BORJA ROBLES MARTIN > <francisco.robles.mar...@alumnos.upm.es> wrote: > > Hello Piotrek! > > Thanks for answering! Yes I have already changed the "TimeCharacteristic" to > "ProcessingTime". I need it for the ".setWriteTimestampToKafka(true)" option > as I use the timestamp in the Kafka consumer who reads this app's output. I > have already changed the code a bit for using KeyedStreams and be able to use > parallelism in the window/reduce functions. > > About the problem, yesterday I noticed that the problem was growing as I did > more submits, it was doing x3 outputs (with small differences in each input > as you can see in my first message), but before it was doing x2 only. Finally > I stopped the cluster (stop-cluster.sh) and started it again > (start-cluster.sh) and the problem was solved. I have been trying to repeat > the problem submitting the app several times but I haven't achieved it today. > If it happens again I will try to repeat the problem with the smaller code as > possible to try to find where could be the possible bug (it seems to be > something wrong when submitting several times). > > Kind regards! > Fran. > > > El 2017-06-19 14:43, Piotr Nowojski escribió: >> Hi, >> It is difficult for me to respond fully to your question. First of all >> it would be really useful if you could strip down your example to a >> minimal version that shows a problem. Unfortunately I was unable to >> reproduce your issue. I was getting only one output line per window >> (as expected). Could you try to print output to the console (or use >> some different data sink) instead of writing it back to the Kafka, >> maybe there is a problem? Also please try remove some parts of the >> code bit by bit, so that you may be able to find what’s causing a >> problem. >> As a side note I have couple of concerns with your >> timestamps/watermarks/windows definitions. First you specify time >> characteristic to an EventTime: >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> But I don’t see where you are actually setting the >> timestamp/watermarks. Didn’t you want to use >> “.assignTimestampsAndWatermarks(…)” on your input DataStream >> based on it’s content? Nevertheless, later you specify window by >> ProcessingTime: >>> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))); >> Which defines the windows independent of the content of those events. >> Maybe switching to properly EvenTime will solve your problem? >> Thanks, Piotrek