Hello,

Im new to Kafka ecosystem so I apologize if this is all a naive question.

What Im looking to accomplish is the following:

- We get heartbeat events from a source which is piped into a kafka topic.
These events are of the form - {"session_id": "foo", "total_time_spent": x
} . We get these events ever 15 seconds or so. The total time spent is the
total time for a certain session id, so for a session id its cumulative and
thus monotonically increasing.
- Now we need to transform this stream to emit incremental total time
spent. So essentially for any event the incremental_time_spent is the
current total_time_spent - previous_total_time_spent . The notion of
previous is time based.

Im wondering how to achieve the above using the streams api.

The first attempt I made was of the following form:

KStreamBuilder builder = new KStreamBuilder();

builder

.stream(topic)

.groupByKey()

.aggregate(() -> null, Incrementalize::incrementalize)

.toStream()

.print();


with the incrementalize method being the following:


private static JsonNode incrementalize(String sessionId, JsonNode
currentNode, JsonNode currentAggregate) {

ObjectNode result = JsonNodeFactory.instance.objectNode();

ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode();

long currentTimeSpent = currentNode.get("total_time_spent").asLong();

if (currentAggregate == null) {

result.put("incremental_time_spent", currentTimeSpent);

} else {

ArrayNode values = (ArrayNode) currentAggregate.get("values");

valuesArray.addAll(values);

result.put("incremental_time_spent",

Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() -
1).asLong()));

}

valuesArray.add(currentTimeSpent);

result.set("values", valuesArray);

return result;

}


Now the problem Im having with the above approach is that the KTable ->
toStream doesnt emit every change, it only seems to emit a change every 30
seconds even though the heartbeats are coming in every 15 seconds and are
getting accumulated into the values array appropriately.

Is there a gap in my fundamental modeling of the problem into streams?
Could someone please point me in the right direction?


Thanks!

Puneet

-- 
Regards,
Puneet

Reply via email to