Hi Brian, Thank you for your response.
1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class) and my constructor with a @SchemaCreate ,I get the following exception: Caused by: java.lang.IllegalAccessError: tried to access method ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V from class ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi at ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown Source) 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class), make the fields private and generate Getters/Setters I get a StackOverflow error: java.lang.StackOverflowError at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512) at org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191) at org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195) at org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191) at org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162) [...] 2.1 When I make the fields public, the pipeline executes, but the PCollection does not have a schema associated with it, which causes the next pipeline step (BigQueryIO) to fail. I want to try AutoValue as well, but that requires some more changes to my code. - I tried supplying the ProtoMessageSchema().toRowFunction and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the pipeline - I tried writing my own toRow/fromRow/getSchema functions for the EnrichedArticle and supplying that to the pipeline Where can I put the breakpoints to get a better understanding of what is happening here? On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bhule...@google.com> wrote: > Hi Tobias, > > You should be able to annotate the EnrichedArticle class with an4 > @DefaultSchema annotation and Beam will infer a schema for it. You would > need to make some tweaks to the class though to be compatible with the > built-in schema providers: you could make the members public and use > JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make > it into an AutoValue and use AutoValueSchema. > > Once you do that you should be able to convert a > PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows [1]. > > Brian > > [1] > https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows-- > > On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> > wrote: > >> I have the following class definition: >> >> public class EnrichedArticle implements Serializable { >> >> // ArticleEnvelope is generated via Protobuf >> private ArticleProto.ArticleEnvelope article; >> // Asset is a Java POJO >> private List<Asset> assets; >> >> public EnrichedArticle(ArticleProto.ArticleEnvelope article, >> List<Asset> assets) { >> this.article = article; >> this.assets = assets; >> } >> } >> >> I am trying to generate a SerializableFunction<EnrichedArticle, Row> and >> a Schema for it so that I can pass it easily to my BigQueryIO at the end of >> my pipeline. Transforming the article to a Row object is straightforward: >> >> First I get the toRow() function for it via the helper: >> >> new ProtoMessageSchema().toRowFunction(TypeDescriptor.of( >> ArticleProto.ArticleEnvelope.class)); >> >> Then I just apply that function to the article field. >> However I don't know how I can manually transform my list of assets (a >> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class) >> >> in my EnrichedArticle container/composition class. What's the recommended >> way of doing this? >> >> >> >>