I want to make my example as simple as possible while also not leaving out
the details that might be the reason for the error. I don't think there is
any recursiveness.
I can also share the ArticleEnvelope Protobuf file If that helps. I've
tried to register the ArticleEnvelope schema like this:

    TestPipeline p = TestPipeline.create();
    TypeDescriptor<ArticleProto.ArticleEnvelope>
articleEnvelopeTypeDescriptor =
        TypeDescriptor.of(ArticleProto.ArticleEnvelope.class);
    Schema articleSchema =
        new
ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));

    SerializableFunction<ArticleProto.ArticleEnvelope, Row>
articleEnvelopeToRow =
        new
ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));

    SerializableFunction<Row, ArticleProto.ArticleEnvelope>
articleEnvelopeFromRow =
        new
ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));


p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class,
        articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow);

The problem is that even when I define and register it like above, as soon
as I annotate the class EnrichedArticle with
@DefaultSchema(JavaFieldSchema.class)
I get:

Caused by: java.lang.IllegalAccessError: tried to access method
ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
from class
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO
at
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown
Source)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at
ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:414)

So it does not seem to have an effect when the annotation on EnrichedArticle
is present. Without the annotation however, there is no schema defined on
the output PCollection, so I have to define it myself for the BigQueryIO to
work:
[The code of the EnrichFn transforms an AssetEnvelope to a Java POJO Asset
class and enriches it via an RPC call, the Asset has a low number of
fields, so doing the manual mapping here is manageable, even though I would
like to use
Beam Schema as soon as this problem here is solved, which would make that
Asset POJO obsolete.]

    PCollection<KV<String, AssetProto.AssetEnvelope>> assets =
        p.apply("Create assets", Create.of(kvAsset));

    PCollection<KV<String, ArticleProto.ArticleEnvelope>> articles =
        p.apply("Create articles", Create.of(kvArticle));

    TupleTag<ArticleProto.ArticleEnvelope> articleTag = new TupleTag<>();
    TupleTag<AssetProto.AssetEnvelope> assetTag = new TupleTag<>();

    PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple
        .of(articleTag, articles).and(assetTag,
assets).apply(CoGroupByKey.<String>create());

    PCollection<EnrichedArticle> output = joinedCollection
        .apply(ParDo.of(new ArticlesKafkaToBigQuery.EnrichFn(articleTag,
assetTag)));
    // The following line returns false:
    output.hasSchema()

    ...BigQueryIO...

On Tue, Jun 30, 2020 at 5:48 AM Luke Cwik <lc...@google.com> wrote:

> Can you give context as to whether schemas will ever allow recursive types
> since this is pretty common in lots of languages?
>
> On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bhule...@google.com> wrote:
>
>> It just occurred to me that BEAM-10265 [1] could be the cause of the
>> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam
>> schemas are not allowed to be recursive, and it looks like we don't fail
>> gracefully for recursive proto definitions.
>>
>> Brian
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-10265
>>
>> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> Hm it looks like the error is from trying to call the zero-arg
>>> constructor for the ArticleEnvelope proto class. Do you have a schema
>>> registered for ArticleEnvelope?
>>>
>>> I think maybe what's happening is Beam finds there's no schema
>>> registered for ArticleEnvelope, so it just recursively
>>> applies JavaFieldSchema, which generates code that attempts to use the
>>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we
>>> should fail earlier with a better message rather than just generating code
>>> that will try to access a private constructor, I filed a jira for this [1].
>>>
>>> I think you can get this working if you register a Schema for
>>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>>> <re...@google.com>  and +Alex Van Boxel <a...@vanboxel.be>  in case
>>> they have better advice), you might try just registering a provider
>>> manually when you create the pipeline, something like
>>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>>> new ProtoMessageSchema())`.
>>>
>>> Brian
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>>
>>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>>> wrote:
>>>
>>>> A bit more context - I started with the Beam documentation and
>>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>>> dug deeper and tried to implement the methods myself.
>>>>
>>>> What I also tried is the following class definition:
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>> public class EnrichedArticle implements Serializable {
>>>>
>>>>   // ArticleEnvelope is generated from Protobuf
>>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>>   // Asset is a Java POJO
>>>>   @Nullable public List<Asset> assets;
>>>>
>>>>   @SchemaCreate
>>>>   public EnrichedArticle() {}
>>>>
>>>>   @SchemaCreate
>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>> List<Asset> assets) {
>>>>     this.article = article;
>>>>     this.assets = assets;
>>>>   }
>>>> }
>>>>
>>>> This throws the following exception:
>>>>
>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> java.lang.IllegalAccessError: tried to access method
>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>> from class
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>> ...
>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>> from class
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>> at
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>>>> Source)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>>> at
>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>>>> at
>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>>>> at
>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>>>> at
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>>>> at
>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>>>> at
>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>>>> at
>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>>>> at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>> at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>> at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>> at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>> at
>>>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)
>>>>
>>>>
>>>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <
>>>> tobias.kay...@ricardo.ch> wrote:
>>>>
>>>>> Hi Brian,
>>>>>
>>>>> Thank you for your response.
>>>>>
>>>>> 1. When I annotate the class with
>>>>> @DefaultSchema(JavaFieldSchema.class) and my constructor with a 
>>>>> @SchemaCreate
>>>>> ,I get the following exception:
>>>>>
>>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>> from class
>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
>>>>> at
>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
>>>>> Source)
>>>>>
>>>>> 2. When I annotate the class with
>>>>> @DefaultSchema(JavaBeanSchema.class), make the fields private and generate
>>>>> Getters/Setters I get a StackOverflow error:
>>>>>
>>>>> java.lang.StackOverflowError
>>>>> at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
>>>>> at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
>>>>> at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
>>>>> at
>>>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
>>>>> at
>>>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>>>>> at
>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>> at
>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>> at
>>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>>>> at
>>>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>>> at
>>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>>> at
>>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>>> at
>>>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>> at
>>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
>>>>> [...]
>>>>>
>>>>> 2.1 When I make the fields public, the pipeline executes, but the
>>>>> PCollection does not have a schema associated with it, which causes the
>>>>> next pipeline step (BigQueryIO) to fail.
>>>>>
>>>>> I want to try AutoValue as well, but that requires some more changes
>>>>> to my code.
>>>>>
>>>>> - I tried supplying the ProtoMessageSchema().toRowFunction
>>>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
>>>>> pipeline
>>>>> - I tried writing my own toRow/fromRow/getSchema functions for the
>>>>> EnrichedArticle and supplying that to the pipeline
>>>>>
>>>>> Where can I put the breakpoints to get a better understanding of what
>>>>> is happening here?
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Tobias,
>>>>>>
>>>>>> You should be able to annotate the EnrichedArticle class with an4
>>>>>> @DefaultSchema annotation and Beam will infer a schema for it. You would
>>>>>> need to make some tweaks to the class though to be compatible with the
>>>>>> built-in schema providers: you could make the members public and use
>>>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or 
>>>>>> make
>>>>>> it into an AutoValue and use AutoValueSchema.
>>>>>>
>>>>>> Once you do that you should be able to convert a
>>>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows 
>>>>>> [1].
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>>>>>
>>>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <
>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>
>>>>>>> I have the following class definition:
>>>>>>>
>>>>>>> public class EnrichedArticle implements Serializable {
>>>>>>>
>>>>>>>   // ArticleEnvelope is generated via Protobuf
>>>>>>>   private ArticleProto.ArticleEnvelope article;
>>>>>>>   // Asset is a Java POJO
>>>>>>>   private List<Asset> assets;
>>>>>>>
>>>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>>>> List<Asset> assets) {
>>>>>>>     this.article = article;
>>>>>>>     this.assets = assets;
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and
>>>>>>> a Schema for it so that I can pass it easily to my BigQueryIO at the 
>>>>>>> end of
>>>>>>> my pipeline. Transforming the article to a Row object is 
>>>>>>> straightforward:
>>>>>>>
>>>>>>> First I get the toRow() function for it via the helper:
>>>>>>>
>>>>>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>>>>>       ArticleProto.ArticleEnvelope.class));
>>>>>>>
>>>>>>> Then I just apply that function to the article field.
>>>>>>> However I don't know how I can manually transform my list of assets
>>>>>>> (a simple Java POJO annotated with:
>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>
>>>>>>> in my EnrichedArticle container/composition class. What's the
>>>>>>> recommended way of doing this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to