A bit more context - I started with the Beam documentation and
tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
dug deeper and tried to implement the methods myself.

What I also tried is the following class definition:

@DefaultSchema(JavaFieldSchema.class)
public class EnrichedArticle implements Serializable {

  // ArticleEnvelope is generated from Protobuf
  @Nullable public ArticleProto.ArticleEnvelope article;
  // Asset is a Java POJO
  @Nullable public List<Asset> assets;

  @SchemaCreate
  public EnrichedArticle() {}

  @SchemaCreate
  public EnrichedArticle(ArticleProto.ArticleEnvelope article, List<Asset>
assets) {
    this.article = article;
    this.assets = assets;
  }
}

This throws the following exception:

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalAccessError: tried to access method
ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
from class
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
...
Caused by: java.lang.IllegalAccessError: tried to access method
ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
from class
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
at
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
Source)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at
ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)


On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
wrote:

> Hi Brian,
>
> Thank you for your response.
>
> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class)
> and my constructor with a @SchemaCreate ,I get the following exception:
>
> Caused by: java.lang.IllegalAccessError: tried to access method
> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
> from class
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
> at
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
> Source)
>
> 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class),
> make the fields private and generate Getters/Setters I get a StackOverflow
> error:
>
> java.lang.StackOverflowError
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
> at
> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
> at
> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
> at
> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
> at
> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
> at
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
> [...]
>
> 2.1 When I make the fields public, the pipeline executes, but the
> PCollection does not have a schema associated with it, which causes the
> next pipeline step (BigQueryIO) to fail.
>
> I want to try AutoValue as well, but that requires some more changes to my
> code.
>
> - I tried supplying the ProtoMessageSchema().toRowFunction
> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
> pipeline
> - I tried writing my own toRow/fromRow/getSchema functions for the
> EnrichedArticle and supplying that to the pipeline
>
> Where can I put the breakpoints to get a better understanding of what is
> happening here?
>
>
>
> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com> wrote:
>
>> Hi Tobias,
>>
>> You should be able to annotate the EnrichedArticle class with an4
>> @DefaultSchema annotation and Beam will infer a schema for it. You would
>> need to make some tweaks to the class though to be compatible with the
>> built-in schema providers: you could make the members public and use
>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
>> it into an AutoValue and use AutoValueSchema.
>>
>> Once you do that you should be able to convert a
>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows [1].
>>
>> Brian
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>
>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>> wrote:
>>
>>> I have the following class definition:
>>>
>>> public class EnrichedArticle implements Serializable {
>>>
>>>   // ArticleEnvelope is generated via Protobuf
>>>   private ArticleProto.ArticleEnvelope article;
>>>   // Asset is a Java POJO
>>>   private List<Asset> assets;
>>>
>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>> List<Asset> assets) {
>>>     this.article = article;
>>>     this.assets = assets;
>>>   }
>>> }
>>>
>>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and
>>> a Schema for it so that I can pass it easily to my BigQueryIO at the end of
>>> my pipeline. Transforming the article to a Row object is straightforward:
>>>
>>> First I get the toRow() function for it via the helper:
>>>
>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>       ArticleProto.ArticleEnvelope.class));
>>>
>>> Then I just apply that function to the article field.
>>> However I don't know how I can manually transform my list of assets (a
>>> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class)
>>>
>>> in my EnrichedArticle container/composition class. What's the
>>> recommended way of doing this?
>>>
>>>
>>>
>>>

Reply via email to