Thanks, Lukasz, it worked as a charm ;-)

The coder is set in the "red text".

/Creating the example input messages final List<String> testMessages =new 
ArrayList<>();
long time = System.currentTimeMillis();
for (int i =0; i <100; i++) {
    testMessages.add("{\"id\":123456789, \"time\":" + (time) +"\", time_ts\":\""+new 
Instant(time)+"\"}");
    time += TimeUnit.SECONDS.toMillis(1);
};

//BigEndianLongCoder TypeDescriptor<KV<Long, KV<Long, String>>> typeDescriptor 
= TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.kvs(TypeDescriptors.longs(), 
TypeDescriptors.strings()));

Pipeline pipe = Pipeline.create();
pipe
        .apply("Create with test input", Create.of(testMessages))
        .apply("Converting from Strings to KV<PrimaryKey, KV<SecondaryKey, 
String>>", MapElements.into(typeDescriptor).via(eventStr -> {
            Any event = JsonIterator.deserialize(eventStr);
            return KV.of(event.get("id").toLong(), 
KV.of(event.get("time").toLong(), eventStr));
}))*.setCoder(KvCoder.of(BigEndianLongCoder.of(), KvCoder.of(BigEndianLongCoder.of(), StringUtf8Coder.of())))*
        .apply(GroupByKey.<Long, KV<Long, String>>create())
        .apply(SortValues.<Long, Long, 
String>create(BufferedExternalSorter.options()))
        .apply(MapElements.into(TypeDescriptors.strings()).via(kvWithIterable->{
            //Shouldn't this be sorted by the Secondary Key, which is the time? 
StreamSupport.stream(kvWithIterable.getValue().spliterator(),false)
                    .map(KV::getValue)
                    .forEach(System.out::println);
            return "";
        }));

pipe.run();


On 08/07/2018 02:48 AM, Lukasz Cwik wrote:
//Creating the example input messages final List<String> testMessages =new 
ArrayList<>();
long time = System.currentTimeMillis();
for (int i =0; i <100; i++) {
     testMessages.add("{\"id\":123456789, \"time\":" + (time) +"\", time_ts\":\""+new 
Instant(time)+"\"}");
     time += TimeUnit.SECONDS.toMillis(1);
};

TypeDescriptor<KV<Long, KV<Long, String>>> typeDescriptor = 
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.kvs(TypeDescriptors.longs(), 
TypeDescriptors.strings()));

Pipeline pipe = Pipeline.create();
pipe
         .apply("Create with test input", Create.of(testMessages))
         .apply("Converting from Strings to KV<PrimaryKey, KV<SecondaryKey, 
String>>", MapElements.into(typeDescriptor).via(eventStr -> {
             Any event = JsonIterator.deserialize(eventStr);
             return KV.of(event.get("id").toLong(), 
KV.of(event.get("time").toLong(), eventStr));
         }))
         .apply(GroupByKey.<Long, KV<Long, String>>create())
         .apply(SortValues.<Long, Long, 
String>create(BufferedExternalSorter.options()))
         
.apply(MapElements.into(TypeDescriptors.strings()).via(kvWithIterable->{
             //Shouldn't this be sorted by the Secondary Key, which is the 
time? StreamSupport.stream(kvWithIterable.getValue().spliterator(),false)
                     .map(KV::getValue)
                     .forEach(System.out::println);
             return "";
         }));

pipe.run();

Reply via email to