If you use Avro schema you should also store the data in Avro format. Everything else is going to be a hack. If you really want to proceed with the hack, you'll either need to use aliases in your Avro reader schema or change the headers of the CSV file to comply with the field names in avro.
Best, Alexander On Thu, 26 Oct 2023 at 17:56, Kirti Dhar Upadhyay K < kirti.k.dhar.upadh...@ericsson.com> wrote: > Hi Alexander, > > > > Thanks for reply. > > Actually I have a system where data travels in form of user defined, AVRO > schema generated objects. > > > > *Sample code:* > > > > static void readCsvWithCustomSchemaDecoder(StreamExecutionEnvironment env, > Path dataDirectory) throws Exception { > > Class recordClazz = EmployeeTest.class; > *// This is AVRO generated java object having fields emp_id and Name * > CsvSchema.Builder builder = CsvSchema.*builder*().setUseHeader(true > ).setReorderColumns(true).setColumnSeparator(','). > setEscapeChar('"').setLineSeparator(System.*lineSeparator* > ()).setQuoteChar('"').setArrayElementSeparator(";"). > setNullValue(""); > > CsvReaderFormat<Object> csvFormat = CsvReaderFormat.*forSchema*( > CsvSchema.*builder*().build(), TypeInformation.*of*(recordClazz)); > > FileSource.FileSourceBuilder fileSourceBuilder = FileSource. > *forRecordStreamFormat*(csvFormat, dataDirectory).monitorContinuously( > Duration.*ofSeconds*(30)); > fileSourceBuilder.setFileEnumerator((FileEnumerator.Provider) () -> new > NonSplittingRecursiveEnumerator(new DefaultFileFilter())); > FileSource source = fileSourceBuilder.build(); > > final DataStreamSource<EmployeeTest> file = env.fromSource(source, > WatermarkStrategy.*forMonotonousTimestamps*() > .withTimestampAssigner(new WatermarkAssigner((Object input) > -> System.*currentTimeMillis*())),"FileSource"); > file.print(); > } > > > > > > Regards, > > Kirti Dhar > > > > *From:* Alexander Fedulov <alexander.fedu...@gmail.com> > *Sent:* 26 October 2023 20:59 > *To:* Kirti Dhar Upadhyay K <kirti.k.dhar.upadh...@ericsson.com> > *Cc:* user@flink.apache.org > *Subject:* Re: CSV Decoder with AVRO schema generated Object > > > > Hi Kirti, > > > > What do you mean exactly by "Flink CSV Decoder"? Please provide a snippet > of the code that you are trying to execute. > > > To be honest, combining CSV with AVRO-generated classes sounds rather > strange and you might want to reconsider your approach. > As for a quick fix, using aliases in your reader schema might help [1] > > [1] https://avro.apache.org/docs/1.8.1/spec.html#Aliases > > > Best, > > Alexander Fedulov > > > > On Thu, 26 Oct 2023 at 16:24, Kirti Dhar Upadhyay K via user < > user@flink.apache.org> wrote: > > Hi Team, > > > > I am using Flink CSV Decoder with AVSC generated java Object and facing > issue if the field name contains underscore(_) or fieldname starts with > Capital case. > > *Sample Schema:* > > { > "namespace": "avro.employee", > "type": "record", > "name": "EmployeeTest", > "fields": [ > { > "name": "emp_id", > "type": ["null","long"] > }, > { > "name": "Name", > "type": ["null","string"] > } > ] > } > > > > Generated Java Object getters/setters: > > > > public void *setEmpId*(java.lang.Long value) { > this.*emp_id* = value; > } > > > > …………………………………………………………………………………………………………. > > …………………………………………………………………………………………………………. > > > > public java.lang.CharSequence *getName*() { > return *Name*; > } > > > > *Input record:* > > emp_id,Name > > 1,peter > > > > *Exception:* > > Caused by: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: > *Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked > as ignorable (2 known properties: "empId", "name"])* > > > > I have also found an old JIRA regarding this: > https://issues.apache.org/jira/browse/FLINK-2874 > > > > Any help would be appreciated! > > > > Regards, > > Kirti Dhar > > > > > > > > > >