I want to make my example as simple as possible while also not leaving out the details that might be the reason for the error. I don't think there is any recursiveness. I can also share the ArticleEnvelope Protobuf file If that helps. I've tried to register the ArticleEnvelope schema like this:
TestPipeline p = TestPipeline.create(); TypeDescriptor<ArticleProto.ArticleEnvelope> articleEnvelopeTypeDescriptor = TypeDescriptor.of(ArticleProto.ArticleEnvelope.class); Schema articleSchema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class)); SerializableFunction<ArticleProto.ArticleEnvelope, Row> articleEnvelopeToRow = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class)); SerializableFunction<Row, ArticleProto.ArticleEnvelope> articleEnvelopeFromRow = new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class)); p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class, articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow); The problem is that even when I define and register it like above, as soon as I annotate the class EnrichedArticle with @DefaultSchema(JavaFieldSchema.class) I get: Caused by: java.lang.IllegalAccessError: tried to access method ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V from class ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO at ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown Source) at org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92) at org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110) at org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87) at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62) at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) at ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:414) So it does not seem to have an effect when the annotation on EnrichedArticle is present. Without the annotation however, there is no schema defined on the output PCollection, so I have to define it myself for the BigQueryIO to work: [The code of the EnrichFn transforms an AssetEnvelope to a Java POJO Asset class and enriches it via an RPC call, the Asset has a low number of fields, so doing the manual mapping here is manageable, even though I would like to use Beam Schema as soon as this problem here is solved, which would make that Asset POJO obsolete.] PCollection<KV<String, AssetProto.AssetEnvelope>> assets = p.apply("Create assets", Create.of(kvAsset)); PCollection<KV<String, ArticleProto.ArticleEnvelope>> articles = p.apply("Create articles", Create.of(kvArticle)); TupleTag<ArticleProto.ArticleEnvelope> articleTag = new TupleTag<>(); TupleTag<AssetProto.AssetEnvelope> assetTag = new TupleTag<>(); PCollection<KV<String, CoGbkResult>> joinedCollection = KeyedPCollectionTuple .of(articleTag, articles).and(assetTag, assets).apply(CoGroupByKey.<String>create()); PCollection<EnrichedArticle> output = joinedCollection .apply(ParDo.of(new ArticlesKafkaToBigQuery.EnrichFn(articleTag, assetTag))); // The following line returns false: output.hasSchema() ...BigQueryIO... On Tue, Jun 30, 2020 at 5:48 AM Luke Cwik <lc...@google.com> wrote: > Can you give context as to whether schemas will ever allow recursive types > since this is pretty common in lots of languages? > > On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bhule...@google.com> wrote: > >> It just occurred to me that BEAM-10265 [1] could be the cause of the >> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam >> schemas are not allowed to be recursive, and it looks like we don't fail >> gracefully for recursive proto definitions. >> >> Brian >> >> [1] https://issues.apache.org/jira/browse/BEAM-10265 >> >> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bhule...@google.com> >> wrote: >> >>> Hm it looks like the error is from trying to call the zero-arg >>> constructor for the ArticleEnvelope proto class. Do you have a schema >>> registered for ArticleEnvelope? >>> >>> I think maybe what's happening is Beam finds there's no schema >>> registered for ArticleEnvelope, so it just recursively >>> applies JavaFieldSchema, which generates code that attempts to use the >>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we >>> should fail earlier with a better message rather than just generating code >>> that will try to access a private constructor, I filed a jira for this [1]. >>> >>> I think you can get this working if you register a Schema for >>> ArticleEnvelope. I'm not actually sure of the best way to do this since >>> it's generated code and you can't use @DefaultSchema (+Reuven Lax >>> <re...@google.com> and +Alex Van Boxel <a...@vanboxel.be> in case >>> they have better advice), you might try just registering a provider >>> manually when you create the pipeline, something like >>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class, >>> new ProtoMessageSchema())`. >>> >>> Brian >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-10372 >>> >>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> >>> wrote: >>> >>>> A bit more context - I started with the Beam documentation and >>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I >>>> dug deeper and tried to implement the methods myself. >>>> >>>> What I also tried is the following class definition: >>>> >>>> @DefaultSchema(JavaFieldSchema.class) >>>> public class EnrichedArticle implements Serializable { >>>> >>>> // ArticleEnvelope is generated from Protobuf >>>> @Nullable public ArticleProto.ArticleEnvelope article; >>>> // Asset is a Java POJO >>>> @Nullable public List<Asset> assets; >>>> >>>> @SchemaCreate >>>> public EnrichedArticle() {} >>>> >>>> @SchemaCreate >>>> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >>>> List<Asset> assets) { >>>> this.article = article; >>>> this.assets = assets; >>>> } >>>> } >>>> >>>> This throws the following exception: >>>> >>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>> java.lang.IllegalAccessError: tried to access method >>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>>> from class >>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 >>>> ... >>>> Caused by: java.lang.IllegalAccessError: tried to access method >>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>>> from class >>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 >>>> at >>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown >>>> Source) >>>> at >>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92) >>>> at >>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110) >>>> at >>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87) >>>> at >>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62) >>>> at >>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) >>>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120) >>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >>>> at >>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115) >>>> at >>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98) >>>> at >>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) >>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) >>>> at >>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) >>>> at >>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) >>>> at >>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) >>>> at >>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300) >>>> at >>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) >>>> at >>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) >>>> at >>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) >>>> at >>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) >>>> at >>>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439) >>>> >>>> >>>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias < >>>> tobias.kay...@ricardo.ch> wrote: >>>> >>>>> Hi Brian, >>>>> >>>>> Thank you for your response. >>>>> >>>>> 1. When I annotate the class with >>>>> @DefaultSchema(JavaFieldSchema.class) and my constructor with a >>>>> @SchemaCreate >>>>> ,I get the following exception: >>>>> >>>>> Caused by: java.lang.IllegalAccessError: tried to access method >>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>>>> from class >>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi >>>>> at >>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown >>>>> Source) >>>>> >>>>> 2. When I annotate the class with >>>>> @DefaultSchema(JavaBeanSchema.class), make the fields private and generate >>>>> Getters/Setters I get a StackOverflow error: >>>>> >>>>> java.lang.StackOverflowError >>>>> at >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197) >>>>> at >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563) >>>>> at >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512) >>>>> at >>>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191) >>>>> at >>>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195) >>>>> at >>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191) >>>>> at >>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143) >>>>> at >>>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) >>>>> at >>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >>>>> at >>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >>>>> at >>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >>>>> at >>>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >>>>> at >>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >>>>> at >>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>>>> at >>>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>> at >>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >>>>> at >>>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66) >>>>> at >>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88) >>>>> at >>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162) >>>>> [...] >>>>> >>>>> 2.1 When I make the fields public, the pipeline executes, but the >>>>> PCollection does not have a schema associated with it, which causes the >>>>> next pipeline step (BigQueryIO) to fail. >>>>> >>>>> I want to try AutoValue as well, but that requires some more changes >>>>> to my code. >>>>> >>>>> - I tried supplying the ProtoMessageSchema().toRowFunction >>>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the >>>>> pipeline >>>>> - I tried writing my own toRow/fromRow/getSchema functions for the >>>>> EnrichedArticle and supplying that to the pipeline >>>>> >>>>> Where can I put the breakpoints to get a better understanding of what >>>>> is happening here? >>>>> >>>>> >>>>> >>>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com> >>>>> wrote: >>>>> >>>>>> Hi Tobias, >>>>>> >>>>>> You should be able to annotate the EnrichedArticle class with an4 >>>>>> @DefaultSchema annotation and Beam will infer a schema for it. You would >>>>>> need to make some tweaks to the class though to be compatible with the >>>>>> built-in schema providers: you could make the members public and use >>>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or >>>>>> make >>>>>> it into an AutoValue and use AutoValueSchema. >>>>>> >>>>>> Once you do that you should be able to convert a >>>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows >>>>>> [1]. >>>>>> >>>>>> Brian >>>>>> >>>>>> [1] >>>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows-- >>>>>> >>>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias < >>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>> >>>>>>> I have the following class definition: >>>>>>> >>>>>>> public class EnrichedArticle implements Serializable { >>>>>>> >>>>>>> // ArticleEnvelope is generated via Protobuf >>>>>>> private ArticleProto.ArticleEnvelope article; >>>>>>> // Asset is a Java POJO >>>>>>> private List<Asset> assets; >>>>>>> >>>>>>> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >>>>>>> List<Asset> assets) { >>>>>>> this.article = article; >>>>>>> this.assets = assets; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and >>>>>>> a Schema for it so that I can pass it easily to my BigQueryIO at the >>>>>>> end of >>>>>>> my pipeline. Transforming the article to a Row object is >>>>>>> straightforward: >>>>>>> >>>>>>> First I get the toRow() function for it via the helper: >>>>>>> >>>>>>> new ProtoMessageSchema().toRowFunction(TypeDescriptor.of( >>>>>>> ArticleProto.ArticleEnvelope.class)); >>>>>>> >>>>>>> Then I just apply that function to the article field. >>>>>>> However I don't know how I can manually transform my list of assets >>>>>>> (a simple Java POJO annotated with: >>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>> >>>>>>> in my EnrichedArticle container/composition class. What's the >>>>>>> recommended way of doing this? >>>>>>> >>>>>>> >>>>>>> >>>>>>>