Hi all
Complete noob with regards to stream processing, this is my first
attempt. I'm going to try and explain my thought process, here's what
I'm trying to do:
I would like to create a sum of "load" for every hour, for every device.
Incoming stream of data:
{"deviceId":"1234","data":{"tss":1507619473,"load":9}}
{"deviceId":"1234","data":{"tss":1507619511,"load":8}}
{"deviceId":"1234","data":{"tss":1507619549,"load":5}}
{"deviceId":"9876","data":{"tss":1507619587,"load":8}}
{"deviceId":"1234","data":{"tss":1507619625,"load":8}}
{"deviceId":"1234","data":{"tss":1507619678,"load":8}}
{"deviceId":"9876","data":{"tss":1507619716,"load":8}}
{"deviceId":"9876","data":{"tss":1507619752,"load":9}}
{"deviceId":"1234","data":{"tss":1507619789,"load":8}}
{"deviceId":"9876","data":{"tss":1507619825,"load":8}}
{"deviceId":"9876","data":{"tss":1507619864,"load":8}}
Where
deviceId: unique ID for every device, which also doubles as the key I use
tss: UNIX timestamp in seconds
load: load indication
Expected outcome something like this:
deviceId: 1234, time: 2017-10-01 18:00, load: 25
deviceId: 1234, time: 2017-10-01 19:00, load: 13
deviceId: 9876, time: 2017-10-01 18:00, load: 33
deviceId: 9876, time: 2017-10-01 19:00, load: 5
...
So I started:
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
// Important bit here, I use this to construct a grouping key
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonObject> data = builder.stream("telemetry");
We need to group by device, so:
KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
v.get("deviceId").asString());
But now I can't group the data again by date. So I made a combined
grouping key like this:
KGroupedStream<String, JsonObject> grouped = data.groupBy(
(k, v) ->
{
Date dt =
Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
return v.get("deviceId").asString() + dateFormat.format(dt);
}
);
Now I need to reduce the groups to sum the load:
grouped.reduce(new Reducer<JsonObject>()
{
@Override
public JsonObject apply(JsonObject v1, JsonObject v2)
{
return null;
}
});
But that's a problem. I'm supposed to sum "load" here, but I also have
to return a JsonObject. That doesn't seem right. So now I figure I have
to extract the "load" before the reducer, but a KGroupedStream doesn't
have a map() function.
Back to the drawing board. So I figure let's extract the "load" and
grouping key first:
KStream<Object, Object> map = data.map(new KeyValueMapper<String,
JsonObject, KeyValue<?, ?>>()
{
@Override
public KeyValue<String, Integer> apply(String s, JsonObject v)
{
Date dt =
Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
String key = v.get("deviceId").asString() +
dateFormat.format(dt);
return new KeyValue<>(
key, v.get("data").asObject().get("load").asInt()
);
}
});
But now I'm left with a KStream of <Object, Object>. I've lost my types.
If I change it to:
Kstream<String, Integer>, the compiler has this to say:
Error:(35, 54) java: incompatible types: inference variable KR has
incompatible bounds
equality constraints: java.lang.String
lower bounds: java.lang.Object
Makes sense, as there's no garantuee that a random given object is a
string. But how do I preserve types then?
I'm also unsure about the way I'm grouping things. It seems to me I have
to group by deviceId, and then using windowing to get the "per hour"
part. But I'm even more clueless how and where that fits in. For some
reason I also think a KTable should be the final result?
Thanks,
Best regards,