Hi Brian,

Thank you for your response.

1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class) and
my constructor with a @SchemaCreate ,I get the following exception:

Caused by: java.lang.IllegalAccessError: tried to access method
ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
from class
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
at
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
Source)

2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class),
make the fields private and generate Getters/Setters I get a StackOverflow
error:

java.lang.StackOverflowError
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
at
org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
at
org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
[...]

2.1 When I make the fields public, the pipeline executes, but the
PCollection does not have a schema associated with it, which causes the
next pipeline step (BigQueryIO) to fail.

I want to try AutoValue as well, but that requires some more changes to my
code.

- I tried supplying the ProtoMessageSchema().toRowFunction
and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
pipeline
- I tried writing my own toRow/fromRow/getSchema functions for the
EnrichedArticle and supplying that to the pipeline

Where can I put the breakpoints to get a better understanding of what is
happening here?



On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com> wrote:

> Hi Tobias,
>
> You should be able to annotate the EnrichedArticle class with an4
> @DefaultSchema annotation and Beam will infer a schema for it. You would
> need to make some tweaks to the class though to be compatible with the
> built-in schema providers: you could make the members public and use
> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
> it into an AutoValue and use AutoValueSchema.
>
> Once you do that you should be able to convert a
> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows [1].
>
> Brian
>
> [1]
> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>
> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
> wrote:
>
>> I have the following class definition:
>>
>> public class EnrichedArticle implements Serializable {
>>
>>   // ArticleEnvelope is generated via Protobuf
>>   private ArticleProto.ArticleEnvelope article;
>>   // Asset is a Java POJO
>>   private List<Asset> assets;
>>
>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>> List<Asset> assets) {
>>     this.article = article;
>>     this.assets = assets;
>>   }
>> }
>>
>> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and
>> a Schema for it so that I can pass it easily to my BigQueryIO at the end of
>> my pipeline. Transforming the article to a Row object is straightforward:
>>
>> First I get the toRow() function for it via the helper:
>>
>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>       ArticleProto.ArticleEnvelope.class));
>>
>> Then I just apply that function to the article field.
>> However I don't know how I can manually transform my list of assets (a
>> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class)
>>
>> in my EnrichedArticle container/composition class. What's the recommended
>> way of doing this?
>>
>>
>>
>>

Reply via email to