You need to disable KTable cache to get every update by setting caches size to zero:
https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html -Matthias On 11/21/17 2:14 PM, Puneet Lakhina wrote: > Hello, > > Im new to Kafka ecosystem so I apologize if this is all a naive question. > > What Im looking to accomplish is the following: > > - We get heartbeat events from a source which is piped into a kafka topic. > These events are of the form - {"session_id": "foo", "total_time_spent": x > } . We get these events ever 15 seconds or so. The total time spent is the > total time for a certain session id, so for a session id its cumulative and > thus monotonically increasing. > - Now we need to transform this stream to emit incremental total time > spent. So essentially for any event the incremental_time_spent is the > current total_time_spent - previous_total_time_spent . The notion of > previous is time based. > > Im wondering how to achieve the above using the streams api. > > The first attempt I made was of the following form: > > KStreamBuilder builder = new KStreamBuilder(); > > builder > > .stream(topic) > > .groupByKey() > > .aggregate(() -> null, Incrementalize::incrementalize) > > .toStream() > > .print(); > > > with the incrementalize method being the following: > > > private static JsonNode incrementalize(String sessionId, JsonNode > currentNode, JsonNode currentAggregate) { > > ObjectNode result = JsonNodeFactory.instance.objectNode(); > > ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode(); > > long currentTimeSpent = currentNode.get("total_time_spent").asLong(); > > if (currentAggregate == null) { > > result.put("incremental_time_spent", currentTimeSpent); > > } else { > > ArrayNode values = (ArrayNode) currentAggregate.get("values"); > > valuesArray.addAll(values); > > result.put("incremental_time_spent", > > Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() - > 1).asLong())); > > } > > valuesArray.add(currentTimeSpent); > > result.set("values", valuesArray); > > return result; > > } > > > Now the problem Im having with the above approach is that the KTable -> > toStream doesnt emit every change, it only seems to emit a change every 30 > seconds even though the heartbeats are coming in every 15 seconds and are > getting accumulated into the values array appropriately. > > Is there a gap in my fundamental modeling of the problem into streams? > Could someone please point me in the right direction? > > > Thanks! > > Puneet >
signature.asc
Description: OpenPGP digital signature