Thanks, Lukasz, it worked as a charm ;-)
The coder is set in the "red text".
/Creating the example input messages final List<String> testMessages =new
ArrayList<>();
long time = System.currentTimeMillis();
for (int i =0; i <100; i++) {
testMessages.add("{\"id\":123456789, \"time\":" + (time) +"\", time_ts\":\""+new
Instant(time)+"\"}");
time += TimeUnit.SECONDS.toMillis(1);
};
//BigEndianLongCoder TypeDescriptor<KV<Long, KV<Long, String>>> typeDescriptor
= TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.kvs(TypeDescriptors.longs(),
TypeDescriptors.strings()));
Pipeline pipe = Pipeline.create();
pipe
.apply("Create with test input", Create.of(testMessages))
.apply("Converting from Strings to KV<PrimaryKey, KV<SecondaryKey,
String>>", MapElements.into(typeDescriptor).via(eventStr -> {
Any event = JsonIterator.deserialize(eventStr);
return KV.of(event.get("id").toLong(),
KV.of(event.get("time").toLong(), eventStr));
}))*.setCoder(KvCoder.of(BigEndianLongCoder.of(),
KvCoder.of(BigEndianLongCoder.of(), StringUtf8Coder.of())))*
.apply(GroupByKey.<Long, KV<Long, String>>create())
.apply(SortValues.<Long, Long,
String>create(BufferedExternalSorter.options()))
.apply(MapElements.into(TypeDescriptors.strings()).via(kvWithIterable->{
//Shouldn't this be sorted by the Secondary Key, which is the time?
StreamSupport.stream(kvWithIterable.getValue().spliterator(),false)
.map(KV::getValue)
.forEach(System.out::println);
return "";
}));
pipe.run();
On 08/07/2018 02:48 AM, Lukasz Cwik wrote:
//Creating the example input messages final List<String> testMessages =new
ArrayList<>();
long time = System.currentTimeMillis();
for (int i =0; i <100; i++) {
testMessages.add("{\"id\":123456789, \"time\":" + (time) +"\", time_ts\":\""+new
Instant(time)+"\"}");
time += TimeUnit.SECONDS.toMillis(1);
};
TypeDescriptor<KV<Long, KV<Long, String>>> typeDescriptor =
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.kvs(TypeDescriptors.longs(),
TypeDescriptors.strings()));
Pipeline pipe = Pipeline.create();
pipe
.apply("Create with test input", Create.of(testMessages))
.apply("Converting from Strings to KV<PrimaryKey, KV<SecondaryKey,
String>>", MapElements.into(typeDescriptor).via(eventStr -> {
Any event = JsonIterator.deserialize(eventStr);
return KV.of(event.get("id").toLong(),
KV.of(event.get("time").toLong(), eventStr));
}))
.apply(GroupByKey.<Long, KV<Long, String>>create())
.apply(SortValues.<Long, Long,
String>create(BufferedExternalSorter.options()))
.apply(MapElements.into(TypeDescriptors.strings()).via(kvWithIterable->{
//Shouldn't this be sorted by the Secondary Key, which is the
time? StreamSupport.stream(kvWithIterable.getValue().spliterator(),false)
.map(KV::getValue)
.forEach(System.out::println);
return "";
}));
pipe.run();