I have a job that is performing an aggregation over a time window. This windowing is supposed to be happening by key, but the output I am seeing is creating an overall window on everything coming in. Is this happening because I am doing a map of the data before I am running the keyBy command? This is a representation of what I am running
*val *stream = env .addSource(kafkaConsumer) //filter out bad json *val *jsonDeserializer = *new *JSONDeserializationSchema() *val *filteredStream = stream.filter(text => { *try *{ jsonDeserializer.deserialize(text.getBytes) *true *} *catch *{ *case *e: Exception => *false *} }) val kafkaStream = filteredStream.map(text => jsonDeserializer.deserialize(text.getBytes)) //method used to filter json not meeting the expected requests val filteredJsonStream = filterIncorrectJson(kafkaStream) //method used to map Json to input object val mappedStream = mapJsonToObject(filteredJsonStream) // pull key out of object val keyedStream = mappedStream.keyBy(_.key) // add window val windowedStream = keyedStream.timeWindow(windowSize, windowSlide) // reduce to aggregates val reducedStream = windowedStream.reduce(aggregateData()) <https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+(919&entry=gmail&source=g> I am pulling in data from Kafka as a String, mapping it to my data model and then pulling out the key, applying the time window with a 30 minute window, 5 minute slide and doing an aggregation. I am expecting <https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+(919&entry=gmail&source=g> that the aggregation is happening on a time window that is separate for each iteration of the key but it is happening every 5 minutes for all keys.