Regarding windowing: actually the window boundaries are aligned at epoch
(i.e. UTC 1970, 00.00.00), so the latest window is not NOW - 1 hour.


Guozhang

On Wed, Oct 11, 2017 at 1:42 AM, RedShift <redsh...@telenet.be> wrote:

> Matthias
>
>
> Thanks, using grouping key of "deviceId + timestamp" with *aggregation*
> _instead of_ reducing solved it:
>
>
>   KGroupedStream<String, JsonObject> grouped = data.groupBy(
>       (k, v) ->
>       {
>           Date dt = Date.from(Instant.ofEpochSecon
> d(v.get("data").asObject().get("tss").asLong()));
>           return v.get("deviceId").asString() + dateFormat.format(dt);
>       }
>   );
>
>
>   KTable<String, Integer> aggregate = grouped.aggregate(
>       () -> 0,
>       (aggKey, value, aggr) -> aggr + value.get("data").asObject().g
> et("load").asInt(),
>       Serdes.Integer()
>   );
>
> I'm still trying to find out how windowing fits in. It sounds like a
> tumbling window, but a tumbling window is defined by its length. So you get
> information for the last hour that has passed, but that last hour is a
> window of NOW - 1 hour. How do I get a window to align to hours of the
> clock?
>
>
>
>
> On 10/10/2017 19:41, Matthias J. Sax wrote:
>
>> Hi,
>>
>> if the aggregation returns a different type, you can use .aggregate(...)
>> instead of .reduce(...)
>>
>> Also, for you time based computation, did you consider to use windowing?
>>
>>
>> -Matthias
>>
>> On 10/10/17 6:27 AM, RedShift wrote:
>>
>>> Hi all
>>>
>>> Complete noob with regards to stream processing, this is my first
>>> attempt. I'm going to try and explain my thought process, here's what
>>> I'm trying to do:
>>>
>>> I would like to create a sum of "load" for every hour, for every device.
>>>
>>> Incoming stream of data:
>>>
>>> {"deviceId":"1234","data":{"tss":1507619473,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619511,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619549,"load":5}}
>>> {"deviceId":"9876","data":{"tss":1507619587,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619625,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619678,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619716,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619752,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619789,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619825,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619864,"load":8}}
>>>
>>> Where
>>> deviceId: unique ID for every device, which also doubles as the key I use
>>> tss: UNIX timestamp in seconds
>>> load: load indication
>>>
>>> Expected outcome something like this:
>>> deviceId: 1234, time: 2017-10-01 18:00, load: 25
>>> deviceId: 1234, time: 2017-10-01 19:00, load: 13
>>> deviceId: 9876, time: 2017-10-01 18:00, load: 33
>>> deviceId: 9876, time: 2017-10-01 19:00, load: 5
>>> ...
>>>
>>>
>>> So I started:
>>>
>>>    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
>>> // Important bit here, I use this to construct a grouping key
>>>    KStreamBuilder builder = new KStreamBuilder();
>>>    KStream<String, JsonObject> data = builder.stream("telemetry");
>>>
>>> We need to group by device, so:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
>>> v.get("deviceId").asString());
>>>
>>> But now I can't group the data again by date. So I made a combined
>>> grouping key like this:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy(
>>>        (k, v) ->
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().
>>> get("tss").asLong()));
>>>
>>>            return v.get("deviceId").asString() + dateFormat.format(dt);
>>>        }
>>>    );
>>>
>>> Now I need to reduce the groups to sum the load:
>>>
>>>    grouped.reduce(new Reducer<JsonObject>()
>>>    {
>>>        @Override
>>>        public JsonObject apply(JsonObject v1, JsonObject v2)
>>>        {
>>>            return null;
>>>        }
>>>    });
>>>
>>> But that's a problem. I'm supposed to sum "load" here, but I also have
>>> to return a JsonObject. That doesn't seem right. So now I figure I have
>>> to extract the "load" before the reducer, but a KGroupedStream doesn't
>>> have a map() function.
>>>
>>> Back to the drawing board. So I figure let's extract the "load" and
>>> grouping key first:
>>>
>>>    KStream<Object, Object> map = data.map(new KeyValueMapper<String,
>>> JsonObject, KeyValue<?, ?>>()
>>>    {
>>>        @Override
>>>        public KeyValue<String, Integer> apply(String s, JsonObject v)
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().
>>> get("tss").asLong()));
>>>
>>>            String key = v.get("deviceId").asString() +
>>> dateFormat.format(dt);
>>>
>>>            return new KeyValue<>(
>>>                key, v.get("data").asObject().get("load").asInt()
>>>            );
>>>        }
>>>    });
>>>
>>> But now I'm left with a KStream of <Object, Object>. I've lost my types.
>>> If I change it to:
>>> Kstream<String, Integer>, the compiler has this to say:
>>>
>>> Error:(35, 54) java: incompatible types: inference variable KR has
>>> incompatible bounds
>>>      equality constraints: java.lang.String
>>>      lower bounds: java.lang.Object
>>>      Makes sense, as there's no garantuee that a random given object is a
>>> string. But how do I preserve types then?
>>>
>>> I'm also unsure about the way I'm grouping things. It seems to me I have
>>> to group by deviceId, and then using windowing to get the "per hour"
>>> part. But I'm even more clueless how and where that fits in. For some
>>> reason I also think a KTable should be the final result?
>>>
>>> Thanks,
>>>
>>> Best regards,
>>>
>>
>>


-- 
-- Guozhang

Reply via email to