Hello, Im new to Kafka ecosystem so I apologize if this is all a naive question.
What Im looking to accomplish is the following: - We get heartbeat events from a source which is piped into a kafka topic. These events are of the form - {"session_id": "foo", "total_time_spent": x } . We get these events ever 15 seconds or so. The total time spent is the total time for a certain session id, so for a session id its cumulative and thus monotonically increasing. - Now we need to transform this stream to emit incremental total time spent. So essentially for any event the incremental_time_spent is the current total_time_spent - previous_total_time_spent . The notion of previous is time based. Im wondering how to achieve the above using the streams api. The first attempt I made was of the following form: KStreamBuilder builder = new KStreamBuilder(); builder .stream(topic) .groupByKey() .aggregate(() -> null, Incrementalize::incrementalize) .toStream() .print(); with the incrementalize method being the following: private static JsonNode incrementalize(String sessionId, JsonNode currentNode, JsonNode currentAggregate) { ObjectNode result = JsonNodeFactory.instance.objectNode(); ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode(); long currentTimeSpent = currentNode.get("total_time_spent").asLong(); if (currentAggregate == null) { result.put("incremental_time_spent", currentTimeSpent); } else { ArrayNode values = (ArrayNode) currentAggregate.get("values"); valuesArray.addAll(values); result.put("incremental_time_spent", Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() - 1).asLong())); } valuesArray.add(currentTimeSpent); result.set("values", valuesArray); return result; } Now the problem Im having with the above approach is that the KTable -> toStream doesnt emit every change, it only seems to emit a change every 30 seconds even though the heartbeats are coming in every 15 seconds and are getting accumulated into the values array appropriately. Is there a gap in my fundamental modeling of the problem into streams? Could someone please point me in the right direction? Thanks! Puneet -- Regards, Puneet