Hi Alexander, Thanks for reply. Actually I have a system where data travels in form of user defined, AVRO schema generated objects.
Sample code: static void readCsvWithCustomSchemaDecoder(StreamExecutionEnvironment env, Path dataDirectory) throws Exception { Class recordClazz = EmployeeTest.class; // This is AVRO generated java object having fields emp_id and Name CsvSchema.Builder builder = CsvSchema.builder().setUseHeader(true).setReorderColumns(true).setColumnSeparator(','). setEscapeChar('"').setLineSeparator(System.lineSeparator()).setQuoteChar('"').setArrayElementSeparator(";"). setNullValue(""); CsvReaderFormat<Object> csvFormat = CsvReaderFormat.forSchema(CsvSchema.builder().build(), TypeInformation.of(recordClazz)); FileSource.FileSourceBuilder fileSourceBuilder = FileSource.forRecordStreamFormat(csvFormat, dataDirectory).monitorContinuously(Duration.ofSeconds(30)); fileSourceBuilder.setFileEnumerator((FileEnumerator.Provider) () -> new NonSplittingRecursiveEnumerator(new DefaultFileFilter())); FileSource source = fileSourceBuilder.build(); final DataStreamSource<EmployeeTest> file = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps() .withTimestampAssigner(new WatermarkAssigner((Object input) -> System.currentTimeMillis())),"FileSource"); file.print(); } Regards, Kirti Dhar From: Alexander Fedulov <alexander.fedu...@gmail.com> Sent: 26 October 2023 20:59 To: Kirti Dhar Upadhyay K <kirti.k.dhar.upadh...@ericsson.com> Cc: user@flink.apache.org Subject: Re: CSV Decoder with AVRO schema generated Object Hi Kirti, What do you mean exactly by "Flink CSV Decoder"? Please provide a snippet of the code that you are trying to execute. To be honest, combining CSV with AVRO-generated classes sounds rather strange and you might want to reconsider your approach. As for a quick fix, using aliases in your reader schema might help [1] [1] https://avro.apache.org/docs/1.8.1/spec.html#Aliases Best, Alexander Fedulov On Thu, 26 Oct 2023 at 16:24, Kirti Dhar Upadhyay K via user <user@flink.apache.org<mailto:user@flink.apache.org>> wrote: Hi Team, I am using Flink CSV Decoder with AVSC generated java Object and facing issue if the field name contains underscore(_) or fieldname starts with Capital case. Sample Schema: { "namespace": "avro.employee", "type": "record", "name": "EmployeeTest", "fields": [ { "name": "emp_id", "type": ["null","long"] }, { "name": "Name", "type": ["null","string"] } ] } Generated Java Object getters/setters: public void setEmpId(java.lang.Long value) { this.emp_id = value; } …………………………………………………………………………………………………………. …………………………………………………………………………………………………………. public java.lang.CharSequence getName() { return Name; } Input record: emp_id,Name 1,peter Exception: Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked as ignorable (2 known properties: "empId", "name"]) I have also found an old JIRA regarding this: https://issues.apache.org/jira/browse/FLINK-2874 Any help would be appreciated! Regards, Kirti Dhar