Regarding windowing: actually the window boundaries are aligned at epoch (i.e. UTC 1970, 00.00.00), so the latest window is not NOW - 1 hour.
Guozhang On Wed, Oct 11, 2017 at 1:42 AM, RedShift <redsh...@telenet.be> wrote: > Matthias > > > Thanks, using grouping key of "deviceId + timestamp" with *aggregation* > _instead of_ reducing solved it: > > > KGroupedStream<String, JsonObject> grouped = data.groupBy( > (k, v) -> > { > Date dt = Date.from(Instant.ofEpochSecon > d(v.get("data").asObject().get("tss").asLong())); > return v.get("deviceId").asString() + dateFormat.format(dt); > } > ); > > > KTable<String, Integer> aggregate = grouped.aggregate( > () -> 0, > (aggKey, value, aggr) -> aggr + value.get("data").asObject().g > et("load").asInt(), > Serdes.Integer() > ); > > I'm still trying to find out how windowing fits in. It sounds like a > tumbling window, but a tumbling window is defined by its length. So you get > information for the last hour that has passed, but that last hour is a > window of NOW - 1 hour. How do I get a window to align to hours of the > clock? > > > > > On 10/10/2017 19:41, Matthias J. Sax wrote: > >> Hi, >> >> if the aggregation returns a different type, you can use .aggregate(...) >> instead of .reduce(...) >> >> Also, for you time based computation, did you consider to use windowing? >> >> >> -Matthias >> >> On 10/10/17 6:27 AM, RedShift wrote: >> >>> Hi all >>> >>> Complete noob with regards to stream processing, this is my first >>> attempt. I'm going to try and explain my thought process, here's what >>> I'm trying to do: >>> >>> I would like to create a sum of "load" for every hour, for every device. >>> >>> Incoming stream of data: >>> >>> {"deviceId":"1234","data":{"tss":1507619473,"load":9}} >>> {"deviceId":"1234","data":{"tss":1507619511,"load":8}} >>> {"deviceId":"1234","data":{"tss":1507619549,"load":5}} >>> {"deviceId":"9876","data":{"tss":1507619587,"load":8}} >>> {"deviceId":"1234","data":{"tss":1507619625,"load":8}} >>> {"deviceId":"1234","data":{"tss":1507619678,"load":8}} >>> {"deviceId":"9876","data":{"tss":1507619716,"load":8}} >>> {"deviceId":"9876","data":{"tss":1507619752,"load":9}} >>> {"deviceId":"1234","data":{"tss":1507619789,"load":8}} >>> {"deviceId":"9876","data":{"tss":1507619825,"load":8}} >>> {"deviceId":"9876","data":{"tss":1507619864,"load":8}} >>> >>> Where >>> deviceId: unique ID for every device, which also doubles as the key I use >>> tss: UNIX timestamp in seconds >>> load: load indication >>> >>> Expected outcome something like this: >>> deviceId: 1234, time: 2017-10-01 18:00, load: 25 >>> deviceId: 1234, time: 2017-10-01 19:00, load: 13 >>> deviceId: 9876, time: 2017-10-01 18:00, load: 33 >>> deviceId: 9876, time: 2017-10-01 19:00, load: 5 >>> ... >>> >>> >>> So I started: >>> >>> SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH"); >>> // Important bit here, I use this to construct a grouping key >>> KStreamBuilder builder = new KStreamBuilder(); >>> KStream<String, JsonObject> data = builder.stream("telemetry"); >>> >>> We need to group by device, so: >>> >>> KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) -> >>> v.get("deviceId").asString()); >>> >>> But now I can't group the data again by date. So I made a combined >>> grouping key like this: >>> >>> KGroupedStream<String, JsonObject> grouped = data.groupBy( >>> (k, v) -> >>> { >>> Date dt = >>> Date.from(Instant.ofEpochSecond(v.get("data").asObject(). >>> get("tss").asLong())); >>> >>> return v.get("deviceId").asString() + dateFormat.format(dt); >>> } >>> ); >>> >>> Now I need to reduce the groups to sum the load: >>> >>> grouped.reduce(new Reducer<JsonObject>() >>> { >>> @Override >>> public JsonObject apply(JsonObject v1, JsonObject v2) >>> { >>> return null; >>> } >>> }); >>> >>> But that's a problem. I'm supposed to sum "load" here, but I also have >>> to return a JsonObject. That doesn't seem right. So now I figure I have >>> to extract the "load" before the reducer, but a KGroupedStream doesn't >>> have a map() function. >>> >>> Back to the drawing board. So I figure let's extract the "load" and >>> grouping key first: >>> >>> KStream<Object, Object> map = data.map(new KeyValueMapper<String, >>> JsonObject, KeyValue<?, ?>>() >>> { >>> @Override >>> public KeyValue<String, Integer> apply(String s, JsonObject v) >>> { >>> Date dt = >>> Date.from(Instant.ofEpochSecond(v.get("data").asObject(). >>> get("tss").asLong())); >>> >>> String key = v.get("deviceId").asString() + >>> dateFormat.format(dt); >>> >>> return new KeyValue<>( >>> key, v.get("data").asObject().get("load").asInt() >>> ); >>> } >>> }); >>> >>> But now I'm left with a KStream of <Object, Object>. I've lost my types. >>> If I change it to: >>> Kstream<String, Integer>, the compiler has this to say: >>> >>> Error:(35, 54) java: incompatible types: inference variable KR has >>> incompatible bounds >>> equality constraints: java.lang.String >>> lower bounds: java.lang.Object >>> Makes sense, as there's no garantuee that a random given object is a >>> string. But how do I preserve types then? >>> >>> I'm also unsure about the way I'm grouping things. It seems to me I have >>> to group by deviceId, and then using windowing to get the "per hour" >>> part. But I'm even more clueless how and where that fits in. For some >>> reason I also think a KTable should be the final result? >>> >>> Thanks, >>> >>> Best regards, >>> >> >> -- -- Guozhang