It just occurred to me that BEAM-10265 [1] could be the cause of the stack
overflow. Does ArticleEnvelope refer to itself recursively? Beam schemas
are not allowed to be recursive, and it looks like we don't fail gracefully
for recursive proto definitions.
Brian

[1] https://issues.apache.org/jira/browse/BEAM-10265

On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bhule...@google.com> wrote:

> Hm it looks like the error is from trying to call the zero-arg constructor
> for the ArticleEnvelope proto class. Do you have a schema registered for
> ArticleEnvelope?
>
> I think maybe what's happening is Beam finds there's no schema registered
> for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which
> generates code that attempts to use the zero-arg constructor. It looks like
> that's a bug in JavaFieldSchema, we should fail earlier with a better
> message rather than just generating code that will try to access a private
> constructor, I filed a jira for this [1].
>
> I think you can get this working if you register a Schema for
> ArticleEnvelope. I'm not actually sure of the best way to do this since
> it's generated code and you can't use @DefaultSchema (+Reuven Lax
> <re...@google.com>  and +Alex Van Boxel <a...@vanboxel.be>  in case they
> have better advice), you might try just registering a provider manually
> when you create the pipeline, something like
> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
> new ProtoMessageSchema())`.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10372
>
> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
> wrote:
>
>> A bit more context - I started with the Beam documentation and
>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>> dug deeper and tried to implement the methods myself.
>>
>> What I also tried is the following class definition:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>> public class EnrichedArticle implements Serializable {
>>
>>   // ArticleEnvelope is generated from Protobuf
>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>   // Asset is a Java POJO
>>   @Nullable public List<Asset> assets;
>>
>>   @SchemaCreate
>>   public EnrichedArticle() {}
>>
>>   @SchemaCreate
>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>> List<Asset> assets) {
>>     this.article = article;
>>     this.assets = assets;
>>   }
>> }
>>
>> This throws the following exception:
>>
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>> ...
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>> at
>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>> at
>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>> at
>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)
>>
>>
>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>> wrote:
>>
>>> Hi Brian,
>>>
>>> Thank you for your response.
>>>
>>> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class)
>>> and my constructor with a @SchemaCreate ,I get the following exception:
>>>
>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>> from class
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
>>> at
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
>>> Source)
>>>
>>> 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class),
>>> make the fields private and generate Getters/Setters I get a StackOverflow
>>> error:
>>>
>>> java.lang.StackOverflowError
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
>>> at
>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
>>> at
>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
>>> at
>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
>>> at
>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
>>> at
>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>>> at
>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>> at
>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>> at
>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>> at
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>> at
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>> at
>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
>>> [...]
>>>
>>> 2.1 When I make the fields public, the pipeline executes, but the
>>> PCollection does not have a schema associated with it, which causes the
>>> next pipeline step (BigQueryIO) to fail.
>>>
>>> I want to try AutoValue as well, but that requires some more changes to
>>> my code.
>>>
>>> - I tried supplying the ProtoMessageSchema().toRowFunction
>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
>>> pipeline
>>> - I tried writing my own toRow/fromRow/getSchema functions for the
>>> EnrichedArticle and supplying that to the pipeline
>>>
>>> Where can I put the breakpoints to get a better understanding of what is
>>> happening here?
>>>
>>>
>>>
>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Hi Tobias,
>>>>
>>>> You should be able to annotate the EnrichedArticle class with an4
>>>> @DefaultSchema annotation and Beam will infer a schema for it. You would
>>>> need to make some tweaks to the class though to be compatible with the
>>>> built-in schema providers: you could make the members public and use
>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
>>>> it into an AutoValue and use AutoValueSchema.
>>>>
>>>> Once you do that you should be able to convert a
>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows [1].
>>>>
>>>> Brian
>>>>
>>>> [1]
>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>>>
>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <
>>>> tobias.kay...@ricardo.ch> wrote:
>>>>
>>>>> I have the following class definition:
>>>>>
>>>>> public class EnrichedArticle implements Serializable {
>>>>>
>>>>>   // ArticleEnvelope is generated via Protobuf
>>>>>   private ArticleProto.ArticleEnvelope article;
>>>>>   // Asset is a Java POJO
>>>>>   private List<Asset> assets;
>>>>>
>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>> List<Asset> assets) {
>>>>>     this.article = article;
>>>>>     this.assets = assets;
>>>>>   }
>>>>> }
>>>>>
>>>>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and
>>>>> a Schema for it so that I can pass it easily to my BigQueryIO at the end 
>>>>> of
>>>>> my pipeline. Transforming the article to a Row object is straightforward:
>>>>>
>>>>> First I get the toRow() function for it via the helper:
>>>>>
>>>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>>>       ArticleProto.ArticleEnvelope.class));
>>>>>
>>>>> Then I just apply that function to the article field.
>>>>> However I don't know how I can manually transform my list of assets (a
>>>>> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class)
>>>>>
>>>>> in my EnrichedArticle container/composition class. What's the
>>>>> recommended way of doing this?
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to