According to the documentation [0] the Create.of() works only for
"Standard" types, but shouldn't it in theory also work for non-standard
types when the Coder is specified?

I want to test a DoFn that receives KafkaRecord<String, String> as an input:

   KafkaRecord input = new KafkaRecord<String, String>(topic, partition,
offset, timestamp,
        kafkaTimestampType, null, kv);
   KafkaRecordCoder kafkaRecordCoder =
        KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    PCollection<KafkaRecord<String, String>> records =
        p.apply(
            Create.of(input).withCoder(kafkaRecordCoder));

But that fails with

java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
was specified. Please set a coder by invoking Create.withCoder() explicitly
 or a schema by invoking Create.withSchema().

[..]
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
  Building a Coder using a registered CoderProvider failed.

However, when I register a CoderProvider for that TestPipeline object:

    Pipeline p = TestPipeline.create();
    p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
        KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));

I get the following NPE:

java.lang.NullPointerException
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
 at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
 at
org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
 at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
 at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
 at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
 at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
 at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
 (...)

And when I try to set the Coder like:

     p.apply(
            Create.of(input).withCoder(kafkaRecordCoder));

My IDE says:
java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
converted to
org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.io.kafka.KafkaRecord<java.lang.String,java.lang.String>>

What am I missing?

[0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/

Reply via email to