Can you give context as to whether schemas will ever allow recursive types
since this is pretty common in lots of languages?

On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bhule...@google.com> wrote:

> It just occurred to me that BEAM-10265 [1] could be the cause of the stack
> overflow. Does ArticleEnvelope refer to itself recursively? Beam schemas
> are not allowed to be recursive, and it looks like we don't fail gracefully
> for recursive proto definitions.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10265
>
> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bhule...@google.com>
> wrote:
>
>> Hm it looks like the error is from trying to call the zero-arg
>> constructor for the ArticleEnvelope proto class. Do you have a schema
>> registered for ArticleEnvelope?
>>
>> I think maybe what's happening is Beam finds there's no schema registered
>> for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which
>> generates code that attempts to use the zero-arg constructor. It looks like
>> that's a bug in JavaFieldSchema, we should fail earlier with a better
>> message rather than just generating code that will try to access a private
>> constructor, I filed a jira for this [1].
>>
>> I think you can get this working if you register a Schema for
>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>> <re...@google.com>  and +Alex Van Boxel <a...@vanboxel.be>  in case they
>> have better advice), you might try just registering a provider manually
>> when you create the pipeline, something like
>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>> new ProtoMessageSchema())`.
>>
>> Brian
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>
>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>> wrote:
>>
>>> A bit more context - I started with the Beam documentation and
>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>> dug deeper and tried to implement the methods myself.
>>>
>>> What I also tried is the following class definition:
>>>
>>> @DefaultSchema(JavaFieldSchema.class)
>>> public class EnrichedArticle implements Serializable {
>>>
>>>   // ArticleEnvelope is generated from Protobuf
>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>   // Asset is a Java POJO
>>>   @Nullable public List<Asset> assets;
>>>
>>>   @SchemaCreate
>>>   public EnrichedArticle() {}
>>>
>>>   @SchemaCreate
>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>> List<Asset> assets) {
>>>     this.article = article;
>>>     this.assets = assets;
>>>   }
>>> }
>>>
>>> This throws the following exception:
>>>
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.IllegalAccessError: tried to access method
>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>> from class
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>> ...
>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>> from class
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>> at
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>>> Source)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>>> at
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>>> at
>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>>> at
>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>>> at
>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>> at
>>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)
>>>
>>>
>>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch> wrote:
>>>
>>>> Hi Brian,
>>>>
>>>> Thank you for your response.
>>>>
>>>> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class)
>>>> and my constructor with a @SchemaCreate ,I get the following
>>>> exception:
>>>>
>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>> from class
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
>>>> at
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
>>>> Source)
>>>>
>>>> 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class),
>>>> make the fields private and generate Getters/Setters I get a StackOverflow
>>>> error:
>>>>
>>>> java.lang.StackOverflowError
>>>> at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
>>>> at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
>>>> at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
>>>> at
>>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
>>>> at
>>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
>>>> at
>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
>>>> at
>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
>>>> at
>>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>>>> at
>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>> at
>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>> at
>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>> at
>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>> at
>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>> at
>>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
>>>> at
>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
>>>> at
>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
>>>> [...]
>>>>
>>>> 2.1 When I make the fields public, the pipeline executes, but the
>>>> PCollection does not have a schema associated with it, which causes the
>>>> next pipeline step (BigQueryIO) to fail.
>>>>
>>>> I want to try AutoValue as well, but that requires some more changes to
>>>> my code.
>>>>
>>>> - I tried supplying the ProtoMessageSchema().toRowFunction
>>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
>>>> pipeline
>>>> - I tried writing my own toRow/fromRow/getSchema functions for the
>>>> EnrichedArticle and supplying that to the pipeline
>>>>
>>>> Where can I put the breakpoints to get a better understanding of what
>>>> is happening here?
>>>>
>>>>
>>>>
>>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Tobias,
>>>>>
>>>>> You should be able to annotate the EnrichedArticle class with an4
>>>>> @DefaultSchema annotation and Beam will infer a schema for it. You would
>>>>> need to make some tweaks to the class though to be compatible with the
>>>>> built-in schema providers: you could make the members public and use
>>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or 
>>>>> make
>>>>> it into an AutoValue and use AutoValueSchema.
>>>>>
>>>>> Once you do that you should be able to convert a
>>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows 
>>>>> [1].
>>>>>
>>>>> Brian
>>>>>
>>>>> [1]
>>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>>>>
>>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>> I have the following class definition:
>>>>>>
>>>>>> public class EnrichedArticle implements Serializable {
>>>>>>
>>>>>>   // ArticleEnvelope is generated via Protobuf
>>>>>>   private ArticleProto.ArticleEnvelope article;
>>>>>>   // Asset is a Java POJO
>>>>>>   private List<Asset> assets;
>>>>>>
>>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>>> List<Asset> assets) {
>>>>>>     this.article = article;
>>>>>>     this.assets = assets;
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and
>>>>>> a Schema for it so that I can pass it easily to my BigQueryIO at the end 
>>>>>> of
>>>>>> my pipeline. Transforming the article to a Row object is straightforward:
>>>>>>
>>>>>> First I get the toRow() function for it via the helper:
>>>>>>
>>>>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>>>>       ArticleProto.ArticleEnvelope.class));
>>>>>>
>>>>>> Then I just apply that function to the article field.
>>>>>> However I don't know how I can manually transform my list of assets
>>>>>> (a simple Java POJO annotated with:
>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>
>>>>>> in my EnrichedArticle container/composition class. What's the
>>>>>> recommended way of doing this?
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to