Hi all,

I'm doing some tests with beam and apache flink. I'm running the code below:

  public static void main(String[] args) throws IOException {
    WorkflowStepOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WorkflowStepOptions.class);
    logger.info("Options Kafka server {} input topic {} output topic {}
window size {} group id {} step name {}",
            options.getKafkaBrokers(), options.getTopics(),
options.getOutputTopic(), options.getWindowSize(),
            options.getGroupId(), workflowStepName);
    Pipeline p = Pipeline.create(options);

    CoderRegistry cr = p.getCoderRegistry();
    cr.registerCoderForClass(MyClass.class, new MyClassCoder());

    KafkaIO.Read<Integer, MyClass> kafkaIOReader =
KafkaIO.<Integer,MyClass>read()
            .withBootstrapServers(options.getKafkaBrokers())
            .withTopics(Arrays.asList(options.getTopics().split(",")))
            .withKeyDeserializer(IntegerDeserializer.class)
            .withValueDeserializer(MyClassEventDeserializer.class)
            //.withTimestampPolicyFactory(new
MyClassTimestampPolicyFactory())
            .withTimestampFn((KV<Integer,MyClass> event) ->
                    event.getValue().getDate() == null ?
                            Instant.now() :
                            Instant.parse(event.getValue().getDate(),

DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ")))
            .withConsumerConfigUpdates(
                    ImmutableMap.of(
                            "group.id", options.getGroupId(),
                            "auto.offset.reset", "earliest")
            );

    KafkaIO.Write<String, String> kafkaOutput = KafkaIO.<String,
String>write()
            .withBootstrapServers(options.getKafkaBrokers())
            .withTopic(options.getOutputTopic())
            .withKeySerializer(StringSerializer.class)
            .withValueSerializer(StringSerializer.class);

    Window<KV<Integer, MyClass>> window = Window
            .<KV<Integer,
MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))
            .accumulatingFiredPanes()
            .withAllowedLateness(Duration.standardDays(365L))
            .triggering(AfterWatermark.pastEndOfWindow()
                    .withEarlyFirings(
                            AfterProcessingTime
                                    .pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(1L)))
                    .withLateFirings(
                            AfterPane
                                    .elementCountAtLeast(1))
            );

    PCollection<Long> toFormat = p.apply(kafkaIOReader.withoutMetadata())
            .apply("Window", window)
            .apply(Combine.globally(Count.<KV<Integer,
MyClass>>combineFn()).withoutDefaults());

    toFormat
            .apply("FormatResults",
                    MapElements

.into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings()))
                            .via((Long count) ->
                            {
                              return KV.of("count", count.toString());
                            })
            )
            .apply(kafkaOutput);

    p.run();
  }

The idea is very simple, read some events from a Kafka topic, group them
into a window, count them and put the result in another Kafka topic.

I'm a little confuse regarding the result, the code above only produces one
entry counting "1" element while I have a lot (around 500) events in the
source topic.

Do you have some suggestion to figure out the solution? Something I'm doing
wrong here.

Regards

Reply via email to