According to the documentation [0] the Create.of() works only for "Standard" types, but shouldn't it in theory also work for non-standard types when the Coder is specified?
I want to test a DoFn that receives KafkaRecord<String, String> as an input: KafkaRecord input = new KafkaRecord<String, String>(topic, partition, offset, timestamp, kafkaTimestampType, null, kv); KafkaRecordCoder kafkaRecordCoder = KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); PCollection<KafkaRecord<String, String>> records = p.apply( Create.of(input).withCoder(kafkaRecordCoder)); But that fails with java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly or a schema by invoking Create.withSchema(). [..] Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord. Building a Coder using a registered CoderProvider failed. However, when I register a CoderProvider for that TestPipeline object: Pipeline p = TestPipeline.create(); p.getCoderRegistry().registerCoderForClass(KafkaRecord.class, KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of())); I get the following NPE: java.lang.NullPointerException at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98) at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65) at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51) at org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408) at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365) at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171) (...) And when I try to set the Coder like: p.apply( Create.of(input).withCoder(kafkaRecordCoder)); My IDE says: java: incompatible types: org.apache.beam.sdk.values.POutput cannot be converted to org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.io.kafka.KafkaRecord<java.lang.String,java.lang.String>> What am I missing? [0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/