Hm it looks like the error is from trying to call the zero-arg constructor for the ArticleEnvelope proto class. Do you have a schema registered for ArticleEnvelope?
I think maybe what's happening is Beam finds there's no schema registered for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which generates code that attempts to use the zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we should fail earlier with a better message rather than just generating code that will try to access a private constructor, I filed a jira for this [1]. I think you can get this working if you register a Schema for ArticleEnvelope. I'm not actually sure of the best way to do this since it's generated code and you can't use @DefaultSchema (+Reuven Lax <re...@google.com> and +Alex Van Boxel <a...@vanboxel.be> in case they have better advice), you might try just registering a provider manually when you create the pipeline, something like `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class, new ProtoMessageSchema())`. Brian [1] https://issues.apache.org/jira/browse/BEAM-10372 On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote: > A bit more context - I started with the Beam documentation and > tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I > dug deeper and tried to implement the methods myself. > > What I also tried is the following class definition: > > @DefaultSchema(JavaFieldSchema.class) > public class EnrichedArticle implements Serializable { > > // ArticleEnvelope is generated from Protobuf > @Nullable public ArticleProto.ArticleEnvelope article; > // Asset is a Java POJO > @Nullable public List<Asset> assets; > > @SchemaCreate > public EnrichedArticle() {} > > @SchemaCreate > public EnrichedArticle(ArticleProto.ArticleEnvelope article, List<Asset> > assets) { > this.article = article; > this.assets = assets; > } > } > > This throws the following exception: > > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalAccessError: tried to access method > ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V > from class > ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 > ... > Caused by: java.lang.IllegalAccessError: tried to access method > ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V > from class > ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 > at > ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown > Source) > at > org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92) > at > org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110) > at > org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87) > at > org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62) > at > org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) > at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) > at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) > at > org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) > at > ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439) > > > On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> > wrote: > >> Hi Brian, >> >> Thank you for your response. >> >> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class) >> and my constructor with a @SchemaCreate ,I get the following exception: >> >> Caused by: java.lang.IllegalAccessError: tried to access method >> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >> from class >> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi >> at >> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown >> Source) >> >> 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class), >> make the fields private and generate Getters/Setters I get a StackOverflow >> error: >> >> java.lang.StackOverflowError >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512) >> at >> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191) >> at >> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195) >> at >> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191) >> at >> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143) >> at >> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) >> at >> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >> at >> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >> at >> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >> at >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >> at >> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66) >> at >> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88) >> at >> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162) >> [...] >> >> 2.1 When I make the fields public, the pipeline executes, but the >> PCollection does not have a schema associated with it, which causes the >> next pipeline step (BigQueryIO) to fail. >> >> I want to try AutoValue as well, but that requires some more changes to >> my code. >> >> - I tried supplying the ProtoMessageSchema().toRowFunction >> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the >> pipeline >> - I tried writing my own toRow/fromRow/getSchema functions for the >> EnrichedArticle and supplying that to the pipeline >> >> Where can I put the breakpoints to get a better understanding of what is >> happening here? >> >> >> >> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com> >> wrote: >> >>> Hi Tobias, >>> >>> You should be able to annotate the EnrichedArticle class with an4 >>> @DefaultSchema annotation and Beam will infer a schema for it. You would >>> need to make some tweaks to the class though to be compatible with the >>> built-in schema providers: you could make the members public and use >>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make >>> it into an AutoValue and use AutoValueSchema. >>> >>> Once you do that you should be able to convert a >>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows [1]. >>> >>> Brian >>> >>> [1] >>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows-- >>> >>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> >>> wrote: >>> >>>> I have the following class definition: >>>> >>>> public class EnrichedArticle implements Serializable { >>>> >>>> // ArticleEnvelope is generated via Protobuf >>>> private ArticleProto.ArticleEnvelope article; >>>> // Asset is a Java POJO >>>> private List<Asset> assets; >>>> >>>> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >>>> List<Asset> assets) { >>>> this.article = article; >>>> this.assets = assets; >>>> } >>>> } >>>> >>>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and >>>> a Schema for it so that I can pass it easily to my BigQueryIO at the end of >>>> my pipeline. Transforming the article to a Row object is straightforward: >>>> >>>> First I get the toRow() function for it via the helper: >>>> >>>> new ProtoMessageSchema().toRowFunction(TypeDescriptor.of( >>>> ArticleProto.ArticleEnvelope.class)); >>>> >>>> Then I just apply that function to the article field. >>>> However I don't know how I can manually transform my list of assets (a >>>> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class) >>>> >>>> in my EnrichedArticle container/composition class. What's the >>>> recommended way of doing this? >>>> >>>> >>>> >>>>