Hi Brian,

Thank you for opening the issue! My current workaround is to generate a
BigQuery schema with helper functions I already have (since I am writing to
BigQuery in the end in my sink). I have the Beam Schema function still in
the code, but I currently don't use them as I couldn't make them work in
time for a company internal demo. (So basically following your advice
trying to avoid the ProtoMessageSchema.)

Best,
Tobi

On Wed, Aug 19, 2020 at 10:52 PM Brian Hulette <bhule...@google.com> wrote:

> It looks like this is occurring because we don't actually support mixing
> SchemaProviders in nested types. The current SchemaProvider implementations
> only support nested types for homogenous types (e.g. an AutoValue with an
> AutoValue field). So when you use JavaFieldSchema as the SchemaProvider for
> the outer type (EnrichedArticle), it is also used recursively for the inner
> type (ArticleEnvelope), rather than using the registered ProtoMessageSchema.
>
> I filed BEAM-10765 [1] to add support for inferring schemas for
> non-homogenous types, I think it's something we should be able to support.
> I know it's been a while since you reported this, have you found a
> workaround in the meantime? Your best bet may be to avoid using
> ProtoMessageSchema for the inner class for now and use the same style of
> class for the outer and inner class by just creating a POJO or AutoValue
> that replicates the ArticleEnvelope class.
>
>
> Luke: Regarding recursive schemas, Reuven and I have had some discussions
> about it offline. I think he said it should be feasible but I don't know
> much beyond that.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10765
>
> On Tue, Jun 30, 2020 at 2:10 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
> wrote:
>
>> I want to make my example as simple as possible while also not leaving
>> out the details that might be the reason for the error. I don't think there
>> is any recursiveness.
>> I can also share the ArticleEnvelope Protobuf file If that helps. I've
>> tried to register the ArticleEnvelope schema like this:
>>
>>     TestPipeline p = TestPipeline.create();
>>     TypeDescriptor<ArticleProto.ArticleEnvelope>
>> articleEnvelopeTypeDescriptor =
>>         TypeDescriptor.of(ArticleProto.ArticleEnvelope.class);
>>     Schema articleSchema =
>>         new
>> ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>     SerializableFunction<ArticleProto.ArticleEnvelope, Row>
>> articleEnvelopeToRow =
>>         new
>> ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>     SerializableFunction<Row, ArticleProto.ArticleEnvelope>
>> articleEnvelopeFromRow =
>>         new
>> ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>
>> p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class,
>>         articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow);
>>
>> The problem is that even when I define and register it like above, as
>> soon as I annotate the class EnrichedArticle with 
>> @DefaultSchema(JavaFieldSchema.class)
>> I get:
>>
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>> at
>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>> at
>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>> at
>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:414)
>>
>> So it does not seem to have an effect when the annotation on
>> EnrichedArticle is present. Without the annotation however, there is no
>> schema defined on the output PCollection, so I have to define it myself for
>> the BigQueryIO to work:
>> [The code of the EnrichFn transforms an AssetEnvelope to a Java POJO
>> Asset class and enriches it via an RPC call, the Asset has a low number of
>> fields, so doing the manual mapping here is manageable, even though I would
>> like to use
>> Beam Schema as soon as this problem here is solved, which would make that
>> Asset POJO obsolete.]
>>
>>     PCollection<KV<String, AssetProto.AssetEnvelope>> assets =
>>         p.apply("Create assets", Create.of(kvAsset));
>>
>>     PCollection<KV<String, ArticleProto.ArticleEnvelope>> articles =
>>         p.apply("Create articles", Create.of(kvArticle));
>>
>>     TupleTag<ArticleProto.ArticleEnvelope> articleTag = new TupleTag<>();
>>     TupleTag<AssetProto.AssetEnvelope> assetTag = new TupleTag<>();
>>
>>     PCollection<KV<String, CoGbkResult>> joinedCollection =
>> KeyedPCollectionTuple
>>         .of(articleTag, articles).and(assetTag,
>> assets).apply(CoGroupByKey.<String>create());
>>
>>     PCollection<EnrichedArticle> output = joinedCollection
>>         .apply(ParDo.of(new ArticlesKafkaToBigQuery.EnrichFn(articleTag,
>> assetTag)));
>>     // The following line returns false:
>>     output.hasSchema()
>>
>>     ...BigQueryIO...
>>
>> On Tue, Jun 30, 2020 at 5:48 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Can you give context as to whether schemas will ever allow recursive
>>> types since this is pretty common in lots of languages?
>>>
>>> On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> It just occurred to me that BEAM-10265 [1] could be the cause of the
>>>> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam
>>>> schemas are not allowed to be recursive, and it looks like we don't fail
>>>> gracefully for recursive proto definitions.
>>>>
>>>> Brian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-10265
>>>>
>>>> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bhule...@google.com>
>>>> wrote:
>>>>
>>>>> Hm it looks like the error is from trying to call the zero-arg
>>>>> constructor for the ArticleEnvelope proto class. Do you have a schema
>>>>> registered for ArticleEnvelope?
>>>>>
>>>>> I think maybe what's happening is Beam finds there's no schema
>>>>> registered for ArticleEnvelope, so it just recursively
>>>>> applies JavaFieldSchema, which generates code that attempts to use the
>>>>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we
>>>>> should fail earlier with a better message rather than just generating code
>>>>> that will try to access a private constructor, I filed a jira for this 
>>>>> [1].
>>>>>
>>>>> I think you can get this working if you register a Schema for
>>>>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>>>>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>>>>> <re...@google.com>  and +Alex Van Boxel <a...@vanboxel.be>  in case
>>>>> they have better advice), you might try just registering a provider
>>>>> manually when you create the pipeline, something like
>>>>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>>>>> new ProtoMessageSchema())`.
>>>>>
>>>>> Brian
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>>>>
>>>>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>> A bit more context - I started with the Beam documentation and
>>>>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>>>>> dug deeper and tried to implement the methods myself.
>>>>>>
>>>>>> What I also tried is the following class definition:
>>>>>>
>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>> public class EnrichedArticle implements Serializable {
>>>>>>
>>>>>>   // ArticleEnvelope is generated from Protobuf
>>>>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>>>>   // Asset is a Java POJO
>>>>>>   @Nullable public List<Asset> assets;
>>>>>>
>>>>>>   @SchemaCreate
>>>>>>   public EnrichedArticle() {}
>>>>>>
>>>>>>   @SchemaCreate
>>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>>> List<Asset> assets) {
>>>>>>     this.article = article;
>>>>>>     this.assets = assets;
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> This throws the following exception:
>>>>>>
>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>> java.lang.IllegalAccessError: tried to access method
>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>>> from class
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>>>> ...
>>>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>>> from class
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>>>> at
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>>>>>> Source)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>>>>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>>>>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>>>>>> at
>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>>>>>> at
>>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>>>> at
>>>>>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)
>>>>>>
>>>>>>
>>>>>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <
>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>
>>>>>>> Hi Brian,
>>>>>>>
>>>>>>> Thank you for your response.
>>>>>>>
>>>>>>> 1. When I annotate the class with
>>>>>>> @DefaultSchema(JavaFieldSchema.class) and my constructor with a 
>>>>>>> @SchemaCreate
>>>>>>> ,I get the following exception:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>>>> from class
>>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
>>>>>>> at
>>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
>>>>>>> Source)
>>>>>>>
>>>>>>> 2. When I annotate the class with
>>>>>>> @DefaultSchema(JavaBeanSchema.class), make the fields private and 
>>>>>>> generate
>>>>>>> Getters/Setters I get a StackOverflow error:
>>>>>>>
>>>>>>> java.lang.StackOverflowError
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>>>> at
>>>>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>>>>>> at
>>>>>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>>>>> at
>>>>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>>>>> at
>>>>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>>>>> at
>>>>>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
>>>>>>> [...]
>>>>>>>
>>>>>>> 2.1 When I make the fields public, the pipeline executes, but the
>>>>>>> PCollection does not have a schema associated with it, which causes the
>>>>>>> next pipeline step (BigQueryIO) to fail.
>>>>>>>
>>>>>>> I want to try AutoValue as well, but that requires some more changes
>>>>>>> to my code.
>>>>>>>
>>>>>>> - I tried supplying the ProtoMessageSchema().toRowFunction
>>>>>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
>>>>>>> pipeline
>>>>>>> - I tried writing my own toRow/fromRow/getSchema functions for the
>>>>>>> EnrichedArticle and supplying that to the pipeline
>>>>>>>
>>>>>>> Where can I put the breakpoints to get a better understanding of
>>>>>>> what is happening here?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Tobias,
>>>>>>>>
>>>>>>>> You should be able to annotate the EnrichedArticle class with an4
>>>>>>>> @DefaultSchema annotation and Beam will infer a schema for it. You 
>>>>>>>> would
>>>>>>>> need to make some tweaks to the class though to be compatible with the
>>>>>>>> built-in schema providers: you could make the members public and use
>>>>>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or 
>>>>>>>> make
>>>>>>>> it into an AutoValue and use AutoValueSchema.
>>>>>>>>
>>>>>>>> Once you do that you should be able to convert a
>>>>>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows 
>>>>>>>> [1].
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>>>>>>>
>>>>>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <
>>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>>
>>>>>>>>> I have the following class definition:
>>>>>>>>>
>>>>>>>>> public class EnrichedArticle implements Serializable {
>>>>>>>>>
>>>>>>>>>   // ArticleEnvelope is generated via Protobuf
>>>>>>>>>   private ArticleProto.ArticleEnvelope article;
>>>>>>>>>   // Asset is a Java POJO
>>>>>>>>>   private List<Asset> assets;
>>>>>>>>>
>>>>>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>>>>>> List<Asset> assets) {
>>>>>>>>>     this.article = article;
>>>>>>>>>     this.assets = assets;
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> I am trying to generate a SerializableFunction<EnrichedArticle,
>>>>>>>>> Row> and a Schema for it so that I can pass it easily to my
>>>>>>>>> BigQueryIO at the end of my pipeline. Transforming the article to a 
>>>>>>>>> Row
>>>>>>>>> object is straightforward:
>>>>>>>>>
>>>>>>>>> First I get the toRow() function for it via the helper:
>>>>>>>>>
>>>>>>>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>>>>>>>       ArticleProto.ArticleEnvelope.class));
>>>>>>>>>
>>>>>>>>> Then I just apply that function to the article field.
>>>>>>>>> However I don't know how I can manually transform my list of
>>>>>>>>> assets (a simple Java POJO annotated with:
>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>
>>>>>>>>> in my EnrichedArticle container/composition class. What's the
>>>>>>>>> recommended way of doing this?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Reply via email to