Hi all

Complete noob with regards to stream processing, this is my first attempt. I'm 
going to try and explain my thought process, here's what I'm trying to do:

I would like to create a sum of "load" for every hour, for every device.

Incoming stream of data:

{"deviceId":"1234","data":{"tss":1507619473,"load":9}}
{"deviceId":"1234","data":{"tss":1507619511,"load":8}}
{"deviceId":"1234","data":{"tss":1507619549,"load":5}}
{"deviceId":"9876","data":{"tss":1507619587,"load":8}}
{"deviceId":"1234","data":{"tss":1507619625,"load":8}}
{"deviceId":"1234","data":{"tss":1507619678,"load":8}}
{"deviceId":"9876","data":{"tss":1507619716,"load":8}}
{"deviceId":"9876","data":{"tss":1507619752,"load":9}}
{"deviceId":"1234","data":{"tss":1507619789,"load":8}}
{"deviceId":"9876","data":{"tss":1507619825,"load":8}}
{"deviceId":"9876","data":{"tss":1507619864,"load":8}}

Where
deviceId: unique ID for every device, which also doubles as the key I use
tss: UNIX timestamp in seconds
load: load indication

Expected outcome something like this:
deviceId: 1234, time: 2017-10-01 18:00, load: 25
deviceId: 1234, time: 2017-10-01 19:00, load: 13
deviceId: 9876, time: 2017-10-01 18:00, load: 33
deviceId: 9876, time: 2017-10-01 19:00, load: 5
...


So I started:

  SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH"); // 
Important bit here, I use this to construct a grouping key
  KStreamBuilder builder = new KStreamBuilder();
  KStream<String, JsonObject> data = builder.stream("telemetry");

We need to group by device, so:

  KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) -> 
v.get("deviceId").asString());

But now I can't group the data again by date. So I made a combined grouping key 
like this:

  KGroupedStream<String, JsonObject> grouped = data.groupBy(
      (k, v) ->
      {
          Date dt = 
Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
          return v.get("deviceId").asString() + dateFormat.format(dt);
      }
  );

Now I need to reduce the groups to sum the load:

  grouped.reduce(new Reducer<JsonObject>()
  {
      @Override
      public JsonObject apply(JsonObject v1, JsonObject v2)
      {
          return null;
      }
  });

But that's a problem. I'm supposed to sum "load" here, but I also have to return a 
JsonObject. That doesn't seem right. So now I figure I have to extract the "load" before 
the reducer, but a KGroupedStream doesn't have a map() function.

Back to the drawing board. So I figure let's extract the "load" and grouping 
key first:

  KStream<Object, Object> map = data.map(new KeyValueMapper<String, JsonObject, 
KeyValue<?, ?>>()
  {
      @Override
      public KeyValue<String, Integer> apply(String s, JsonObject v)
      {
          Date dt = 
Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
          String key = v.get("deviceId").asString() + dateFormat.format(dt);

          return new KeyValue<>(
              key, v.get("data").asObject().get("load").asInt()
          );
      }
  });

But now I'm left with a KStream of <Object, Object>. I've lost my types. If I 
change it to:
Kstream<String, Integer>, the compiler has this to say:

Error:(35, 54) java: incompatible types: inference variable KR has incompatible 
bounds
    equality constraints: java.lang.String
    lower bounds: java.lang.Object
Makes sense, as there's no garantuee that a random given object is a string. But how do I preserve types then?

I'm also unsure about the way I'm grouping things. It seems to me I have to group by 
deviceId, and then using windowing to get the "per hour" part. But I'm even 
more clueless how and where that fits in. For some reason I also think a KTable should be 
the final result?

Thanks,

Best regards,

Reply via email to