Can you give context as to whether schemas will ever allow recursive types since this is pretty common in lots of languages?
On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bhule...@google.com> wrote: > It just occurred to me that BEAM-10265 [1] could be the cause of the stack > overflow. Does ArticleEnvelope refer to itself recursively? Beam schemas > are not allowed to be recursive, and it looks like we don't fail gracefully > for recursive proto definitions. > > Brian > > [1] https://issues.apache.org/jira/browse/BEAM-10265 > > On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bhule...@google.com> > wrote: > >> Hm it looks like the error is from trying to call the zero-arg >> constructor for the ArticleEnvelope proto class. Do you have a schema >> registered for ArticleEnvelope? >> >> I think maybe what's happening is Beam finds there's no schema registered >> for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which >> generates code that attempts to use the zero-arg constructor. It looks like >> that's a bug in JavaFieldSchema, we should fail earlier with a better >> message rather than just generating code that will try to access a private >> constructor, I filed a jira for this [1]. >> >> I think you can get this working if you register a Schema for >> ArticleEnvelope. I'm not actually sure of the best way to do this since >> it's generated code and you can't use @DefaultSchema (+Reuven Lax >> <re...@google.com> and +Alex Van Boxel <a...@vanboxel.be> in case they >> have better advice), you might try just registering a provider manually >> when you create the pipeline, something like >> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class, >> new ProtoMessageSchema())`. >> >> Brian >> >> [1] https://issues.apache.org/jira/browse/BEAM-10372 >> >> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> >> wrote: >> >>> A bit more context - I started with the Beam documentation and >>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I >>> dug deeper and tried to implement the methods myself. >>> >>> What I also tried is the following class definition: >>> >>> @DefaultSchema(JavaFieldSchema.class) >>> public class EnrichedArticle implements Serializable { >>> >>> // ArticleEnvelope is generated from Protobuf >>> @Nullable public ArticleProto.ArticleEnvelope article; >>> // Asset is a Java POJO >>> @Nullable public List<Asset> assets; >>> >>> @SchemaCreate >>> public EnrichedArticle() {} >>> >>> @SchemaCreate >>> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >>> List<Asset> assets) { >>> this.article = article; >>> this.assets = assets; >>> } >>> } >>> >>> This throws the following exception: >>> >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>> java.lang.IllegalAccessError: tried to access method >>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>> from class >>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 >>> ... >>> Caused by: java.lang.IllegalAccessError: tried to access method >>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>> from class >>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 >>> at >>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown >>> Source) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) >>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120) >>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >>> at >>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115) >>> at >>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98) >>> at >>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) >>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) >>> at >>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) >>> at >>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) >>> at >>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) >>> at >>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) >>> at >>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439) >>> >>> >>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias < >>> tobias.kay...@ricardo.ch> wrote: >>> >>>> Hi Brian, >>>> >>>> Thank you for your response. >>>> >>>> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class) >>>> and my constructor with a @SchemaCreate ,I get the following >>>> exception: >>>> >>>> Caused by: java.lang.IllegalAccessError: tried to access method >>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>>> from class >>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi >>>> at >>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown >>>> Source) >>>> >>>> 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class), >>>> make the fields private and generate Getters/Setters I get a StackOverflow >>>> error: >>>> >>>> java.lang.StackOverflowError >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512) >>>> at >>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191) >>>> at >>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195) >>>> at >>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191) >>>> at >>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143) >>>> at >>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) >>>> at >>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >>>> at >>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >>>> at >>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >>>> at >>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >>>> at >>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>> at >>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >>>> at >>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66) >>>> at >>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88) >>>> at >>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162) >>>> [...] >>>> >>>> 2.1 When I make the fields public, the pipeline executes, but the >>>> PCollection does not have a schema associated with it, which causes the >>>> next pipeline step (BigQueryIO) to fail. >>>> >>>> I want to try AutoValue as well, but that requires some more changes to >>>> my code. >>>> >>>> - I tried supplying the ProtoMessageSchema().toRowFunction >>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the >>>> pipeline >>>> - I tried writing my own toRow/fromRow/getSchema functions for the >>>> EnrichedArticle and supplying that to the pipeline >>>> >>>> Where can I put the breakpoints to get a better understanding of what >>>> is happening here? >>>> >>>> >>>> >>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Hi Tobias, >>>>> >>>>> You should be able to annotate the EnrichedArticle class with an4 >>>>> @DefaultSchema annotation and Beam will infer a schema for it. You would >>>>> need to make some tweaks to the class though to be compatible with the >>>>> built-in schema providers: you could make the members public and use >>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or >>>>> make >>>>> it into an AutoValue and use AutoValueSchema. >>>>> >>>>> Once you do that you should be able to convert a >>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows >>>>> [1]. >>>>> >>>>> Brian >>>>> >>>>> [1] >>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows-- >>>>> >>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias < >>>>> tobias.kay...@ricardo.ch> wrote: >>>>> >>>>>> I have the following class definition: >>>>>> >>>>>> public class EnrichedArticle implements Serializable { >>>>>> >>>>>> // ArticleEnvelope is generated via Protobuf >>>>>> private ArticleProto.ArticleEnvelope article; >>>>>> // Asset is a Java POJO >>>>>> private List<Asset> assets; >>>>>> >>>>>> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >>>>>> List<Asset> assets) { >>>>>> this.article = article; >>>>>> this.assets = assets; >>>>>> } >>>>>> } >>>>>> >>>>>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and >>>>>> a Schema for it so that I can pass it easily to my BigQueryIO at the end >>>>>> of >>>>>> my pipeline. Transforming the article to a Row object is straightforward: >>>>>> >>>>>> First I get the toRow() function for it via the helper: >>>>>> >>>>>> new ProtoMessageSchema().toRowFunction(TypeDescriptor.of( >>>>>> ArticleProto.ArticleEnvelope.class)); >>>>>> >>>>>> Then I just apply that function to the article field. >>>>>> However I don't know how I can manually transform my list of assets >>>>>> (a simple Java POJO annotated with: >>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>> >>>>>> in my EnrichedArticle container/composition class. What's the >>>>>> recommended way of doing this? >>>>>> >>>>>> >>>>>> >>>>>>