Hi Brian, Thank you for opening the issue! My current workaround is to generate a BigQuery schema with helper functions I already have (since I am writing to BigQuery in the end in my sink). I have the Beam Schema function still in the code, but I currently don't use them as I couldn't make them work in time for a company internal demo. (So basically following your advice trying to avoid the ProtoMessageSchema.)
Best, Tobi On Wed, Aug 19, 2020 at 10:52 PM Brian Hulette <bhule...@google.com> wrote: > It looks like this is occurring because we don't actually support mixing > SchemaProviders in nested types. The current SchemaProvider implementations > only support nested types for homogenous types (e.g. an AutoValue with an > AutoValue field). So when you use JavaFieldSchema as the SchemaProvider for > the outer type (EnrichedArticle), it is also used recursively for the inner > type (ArticleEnvelope), rather than using the registered ProtoMessageSchema. > > I filed BEAM-10765 [1] to add support for inferring schemas for > non-homogenous types, I think it's something we should be able to support. > I know it's been a while since you reported this, have you found a > workaround in the meantime? Your best bet may be to avoid using > ProtoMessageSchema for the inner class for now and use the same style of > class for the outer and inner class by just creating a POJO or AutoValue > that replicates the ArticleEnvelope class. > > > Luke: Regarding recursive schemas, Reuven and I have had some discussions > about it offline. I think he said it should be feasible but I don't know > much beyond that. > > Brian > > [1] https://issues.apache.org/jira/browse/BEAM-10765 > > On Tue, Jun 30, 2020 at 2:10 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> > wrote: > >> I want to make my example as simple as possible while also not leaving >> out the details that might be the reason for the error. I don't think there >> is any recursiveness. >> I can also share the ArticleEnvelope Protobuf file If that helps. I've >> tried to register the ArticleEnvelope schema like this: >> >> TestPipeline p = TestPipeline.create(); >> TypeDescriptor<ArticleProto.ArticleEnvelope> >> articleEnvelopeTypeDescriptor = >> TypeDescriptor.of(ArticleProto.ArticleEnvelope.class); >> Schema articleSchema = >> new >> ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class)); >> >> SerializableFunction<ArticleProto.ArticleEnvelope, Row> >> articleEnvelopeToRow = >> new >> ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class)); >> >> SerializableFunction<Row, ArticleProto.ArticleEnvelope> >> articleEnvelopeFromRow = >> new >> ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class)); >> >> >> p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class, >> articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow); >> >> The problem is that even when I define and register it like above, as >> soon as I annotate the class EnrichedArticle with >> @DefaultSchema(JavaFieldSchema.class) >> I get: >> >> Caused by: java.lang.IllegalAccessError: tried to access method >> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >> from class >> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO >> at >> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown >> Source) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) >> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120) >> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >> at >> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115) >> at >> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98) >> at >> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) >> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) >> at >> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) >> at >> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) >> at >> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) >> at >> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) >> at >> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:414) >> >> So it does not seem to have an effect when the annotation on >> EnrichedArticle is present. Without the annotation however, there is no >> schema defined on the output PCollection, so I have to define it myself for >> the BigQueryIO to work: >> [The code of the EnrichFn transforms an AssetEnvelope to a Java POJO >> Asset class and enriches it via an RPC call, the Asset has a low number of >> fields, so doing the manual mapping here is manageable, even though I would >> like to use >> Beam Schema as soon as this problem here is solved, which would make that >> Asset POJO obsolete.] >> >> PCollection<KV<String, AssetProto.AssetEnvelope>> assets = >> p.apply("Create assets", Create.of(kvAsset)); >> >> PCollection<KV<String, ArticleProto.ArticleEnvelope>> articles = >> p.apply("Create articles", Create.of(kvArticle)); >> >> TupleTag<ArticleProto.ArticleEnvelope> articleTag = new TupleTag<>(); >> TupleTag<AssetProto.AssetEnvelope> assetTag = new TupleTag<>(); >> >> PCollection<KV<String, CoGbkResult>> joinedCollection = >> KeyedPCollectionTuple >> .of(articleTag, articles).and(assetTag, >> assets).apply(CoGroupByKey.<String>create()); >> >> PCollection<EnrichedArticle> output = joinedCollection >> .apply(ParDo.of(new ArticlesKafkaToBigQuery.EnrichFn(articleTag, >> assetTag))); >> // The following line returns false: >> output.hasSchema() >> >> ...BigQueryIO... >> >> On Tue, Jun 30, 2020 at 5:48 AM Luke Cwik <lc...@google.com> wrote: >> >>> Can you give context as to whether schemas will ever allow recursive >>> types since this is pretty common in lots of languages? >>> >>> On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bhule...@google.com> >>> wrote: >>> >>>> It just occurred to me that BEAM-10265 [1] could be the cause of the >>>> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam >>>> schemas are not allowed to be recursive, and it looks like we don't fail >>>> gracefully for recursive proto definitions. >>>> >>>> Brian >>>> >>>> [1] https://issues.apache.org/jira/browse/BEAM-10265 >>>> >>>> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Hm it looks like the error is from trying to call the zero-arg >>>>> constructor for the ArticleEnvelope proto class. Do you have a schema >>>>> registered for ArticleEnvelope? >>>>> >>>>> I think maybe what's happening is Beam finds there's no schema >>>>> registered for ArticleEnvelope, so it just recursively >>>>> applies JavaFieldSchema, which generates code that attempts to use the >>>>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we >>>>> should fail earlier with a better message rather than just generating code >>>>> that will try to access a private constructor, I filed a jira for this >>>>> [1]. >>>>> >>>>> I think you can get this working if you register a Schema for >>>>> ArticleEnvelope. I'm not actually sure of the best way to do this since >>>>> it's generated code and you can't use @DefaultSchema (+Reuven Lax >>>>> <re...@google.com> and +Alex Van Boxel <a...@vanboxel.be> in case >>>>> they have better advice), you might try just registering a provider >>>>> manually when you create the pipeline, something like >>>>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class, >>>>> new ProtoMessageSchema())`. >>>>> >>>>> Brian >>>>> >>>>> [1] https://issues.apache.org/jira/browse/BEAM-10372 >>>>> >>>>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias < >>>>> tobias.kay...@ricardo.ch> wrote: >>>>> >>>>>> A bit more context - I started with the Beam documentation and >>>>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I >>>>>> dug deeper and tried to implement the methods myself. >>>>>> >>>>>> What I also tried is the following class definition: >>>>>> >>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>> public class EnrichedArticle implements Serializable { >>>>>> >>>>>> // ArticleEnvelope is generated from Protobuf >>>>>> @Nullable public ArticleProto.ArticleEnvelope article; >>>>>> // Asset is a Java POJO >>>>>> @Nullable public List<Asset> assets; >>>>>> >>>>>> @SchemaCreate >>>>>> public EnrichedArticle() {} >>>>>> >>>>>> @SchemaCreate >>>>>> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >>>>>> List<Asset> assets) { >>>>>> this.article = article; >>>>>> this.assets = assets; >>>>>> } >>>>>> } >>>>>> >>>>>> This throws the following exception: >>>>>> >>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>> java.lang.IllegalAccessError: tried to access method >>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>>>>> from class >>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 >>>>>> ... >>>>>> Caused by: java.lang.IllegalAccessError: tried to access method >>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>>>>> from class >>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1 >>>>>> at >>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown >>>>>> Source) >>>>>> at >>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92) >>>>>> at >>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110) >>>>>> at >>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87) >>>>>> at >>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62) >>>>>> at >>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) >>>>>> at >>>>>> org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120) >>>>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >>>>>> at >>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115) >>>>>> at >>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98) >>>>>> at >>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) >>>>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) >>>>>> at >>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) >>>>>> at >>>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) >>>>>> at >>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) >>>>>> at >>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300) >>>>>> at >>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) >>>>>> at >>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) >>>>>> at >>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) >>>>>> at >>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) >>>>>> at >>>>>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439) >>>>>> >>>>>> >>>>>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias < >>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>> >>>>>>> Hi Brian, >>>>>>> >>>>>>> Thank you for your response. >>>>>>> >>>>>>> 1. When I annotate the class with >>>>>>> @DefaultSchema(JavaFieldSchema.class) and my constructor with a >>>>>>> @SchemaCreate >>>>>>> ,I get the following exception: >>>>>>> >>>>>>> Caused by: java.lang.IllegalAccessError: tried to access method >>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V >>>>>>> from class >>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi >>>>>>> at >>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown >>>>>>> Source) >>>>>>> >>>>>>> 2. When I annotate the class with >>>>>>> @DefaultSchema(JavaBeanSchema.class), make the fields private and >>>>>>> generate >>>>>>> Getters/Setters I get a StackOverflow error: >>>>>>> >>>>>>> java.lang.StackOverflowError >>>>>>> at >>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197) >>>>>>> at >>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563) >>>>>>> at >>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512) >>>>>>> at >>>>>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191) >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195) >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191) >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143) >>>>>>> at >>>>>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) >>>>>>> at >>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >>>>>>> at >>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) >>>>>>> at >>>>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >>>>>>> at >>>>>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >>>>>>> at >>>>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >>>>>>> at >>>>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>>>>>> at >>>>>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>> at >>>>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66) >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88) >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162) >>>>>>> [...] >>>>>>> >>>>>>> 2.1 When I make the fields public, the pipeline executes, but the >>>>>>> PCollection does not have a schema associated with it, which causes the >>>>>>> next pipeline step (BigQueryIO) to fail. >>>>>>> >>>>>>> I want to try AutoValue as well, but that requires some more changes >>>>>>> to my code. >>>>>>> >>>>>>> - I tried supplying the ProtoMessageSchema().toRowFunction >>>>>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the >>>>>>> pipeline >>>>>>> - I tried writing my own toRow/fromRow/getSchema functions for the >>>>>>> EnrichedArticle and supplying that to the pipeline >>>>>>> >>>>>>> Where can I put the breakpoints to get a better understanding of >>>>>>> what is happening here? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Tobias, >>>>>>>> >>>>>>>> You should be able to annotate the EnrichedArticle class with an4 >>>>>>>> @DefaultSchema annotation and Beam will infer a schema for it. You >>>>>>>> would >>>>>>>> need to make some tweaks to the class though to be compatible with the >>>>>>>> built-in schema providers: you could make the members public and use >>>>>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or >>>>>>>> make >>>>>>>> it into an AutoValue and use AutoValueSchema. >>>>>>>> >>>>>>>> Once you do that you should be able to convert a >>>>>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows >>>>>>>> [1]. >>>>>>>> >>>>>>>> Brian >>>>>>>> >>>>>>>> [1] >>>>>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows-- >>>>>>>> >>>>>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias < >>>>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>>>> >>>>>>>>> I have the following class definition: >>>>>>>>> >>>>>>>>> public class EnrichedArticle implements Serializable { >>>>>>>>> >>>>>>>>> // ArticleEnvelope is generated via Protobuf >>>>>>>>> private ArticleProto.ArticleEnvelope article; >>>>>>>>> // Asset is a Java POJO >>>>>>>>> private List<Asset> assets; >>>>>>>>> >>>>>>>>> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >>>>>>>>> List<Asset> assets) { >>>>>>>>> this.article = article; >>>>>>>>> this.assets = assets; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> I am trying to generate a SerializableFunction<EnrichedArticle, >>>>>>>>> Row> and a Schema for it so that I can pass it easily to my >>>>>>>>> BigQueryIO at the end of my pipeline. Transforming the article to a >>>>>>>>> Row >>>>>>>>> object is straightforward: >>>>>>>>> >>>>>>>>> First I get the toRow() function for it via the helper: >>>>>>>>> >>>>>>>>> new ProtoMessageSchema().toRowFunction(TypeDescriptor.of( >>>>>>>>> ArticleProto.ArticleEnvelope.class)); >>>>>>>>> >>>>>>>>> Then I just apply that function to the article field. >>>>>>>>> However I don't know how I can manually transform my list of >>>>>>>>> assets (a simple Java POJO annotated with: >>>>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>>>> >>>>>>>>> in my EnrichedArticle container/composition class. What's the >>>>>>>>> recommended way of doing this? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>