Matthias

Thanks, using grouping key of "deviceId + timestamp" with *aggregation* 
_instead of_ reducing solved it:


  KGroupedStream<String, JsonObject> grouped = data.groupBy(
      (k, v) ->
      {
          Date dt = 
Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
          return v.get("deviceId").asString() + dateFormat.format(dt);
      }
  );


  KTable<String, Integer> aggregate = grouped.aggregate(
      () -> 0,
      (aggKey, value, aggr) -> aggr + 
value.get("data").asObject().get("load").asInt(),
      Serdes.Integer()
  );

I'm still trying to find out how windowing fits in. It sounds like a tumbling 
window, but a tumbling window is defined by its length. So you get information 
for the last hour that has passed, but that last hour is a window of NOW - 1 
hour. How do I get a window to align to hours of the clock?



On 10/10/2017 19:41, Matthias J. Sax wrote:
Hi,

if the aggregation returns a different type, you can use .aggregate(...)
instead of .reduce(...)

Also, for you time based computation, did you consider to use windowing?


-Matthias

On 10/10/17 6:27 AM, RedShift wrote:
Hi all

Complete noob with regards to stream processing, this is my first
attempt. I'm going to try and explain my thought process, here's what
I'm trying to do:

I would like to create a sum of "load" for every hour, for every device.

Incoming stream of data:

{"deviceId":"1234","data":{"tss":1507619473,"load":9}}
{"deviceId":"1234","data":{"tss":1507619511,"load":8}}
{"deviceId":"1234","data":{"tss":1507619549,"load":5}}
{"deviceId":"9876","data":{"tss":1507619587,"load":8}}
{"deviceId":"1234","data":{"tss":1507619625,"load":8}}
{"deviceId":"1234","data":{"tss":1507619678,"load":8}}
{"deviceId":"9876","data":{"tss":1507619716,"load":8}}
{"deviceId":"9876","data":{"tss":1507619752,"load":9}}
{"deviceId":"1234","data":{"tss":1507619789,"load":8}}
{"deviceId":"9876","data":{"tss":1507619825,"load":8}}
{"deviceId":"9876","data":{"tss":1507619864,"load":8}}

Where
deviceId: unique ID for every device, which also doubles as the key I use
tss: UNIX timestamp in seconds
load: load indication

Expected outcome something like this:
deviceId: 1234, time: 2017-10-01 18:00, load: 25
deviceId: 1234, time: 2017-10-01 19:00, load: 13
deviceId: 9876, time: 2017-10-01 18:00, load: 33
deviceId: 9876, time: 2017-10-01 19:00, load: 5
...


So I started:

   SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
// Important bit here, I use this to construct a grouping key
   KStreamBuilder builder = new KStreamBuilder();
   KStream<String, JsonObject> data = builder.stream("telemetry");

We need to group by device, so:

   KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
v.get("deviceId").asString());

But now I can't group the data again by date. So I made a combined
grouping key like this:

   KGroupedStream<String, JsonObject> grouped = data.groupBy(
       (k, v) ->
       {
           Date dt =
Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));

           return v.get("deviceId").asString() + dateFormat.format(dt);
       }
   );

Now I need to reduce the groups to sum the load:

   grouped.reduce(new Reducer<JsonObject>()
   {
       @Override
       public JsonObject apply(JsonObject v1, JsonObject v2)
       {
           return null;
       }
   });

But that's a problem. I'm supposed to sum "load" here, but I also have
to return a JsonObject. That doesn't seem right. So now I figure I have
to extract the "load" before the reducer, but a KGroupedStream doesn't
have a map() function.

Back to the drawing board. So I figure let's extract the "load" and
grouping key first:

   KStream<Object, Object> map = data.map(new KeyValueMapper<String,
JsonObject, KeyValue<?, ?>>()
   {
       @Override
       public KeyValue<String, Integer> apply(String s, JsonObject v)
       {
           Date dt =
Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));

           String key = v.get("deviceId").asString() +
dateFormat.format(dt);

           return new KeyValue<>(
               key, v.get("data").asObject().get("load").asInt()
           );
       }
   });

But now I'm left with a KStream of <Object, Object>. I've lost my types.
If I change it to:
Kstream<String, Integer>, the compiler has this to say:

Error:(35, 54) java: incompatible types: inference variable KR has
incompatible bounds
     equality constraints: java.lang.String
     lower bounds: java.lang.Object
     Makes sense, as there's no garantuee that a random given object is a
string. But how do I preserve types then?

I'm also unsure about the way I'm grouping things. It seems to me I have
to group by deviceId, and then using windowing to get the "per hour"
part. But I'm even more clueless how and where that fits in. For some
reason I also think a KTable should be the final result?

Thanks,

Best regards,

Reply via email to