Hi Alexander,

Thanks for reply.
Actually I have a system where data travels in form of user defined, AVRO 
schema generated objects.

Sample code:

static void readCsvWithCustomSchemaDecoder(StreamExecutionEnvironment env, Path 
dataDirectory) throws Exception {

    Class recordClazz = EmployeeTest.class; // This is AVRO generated java 
object having fields emp_id and Name
    CsvSchema.Builder builder = 
CsvSchema.builder().setUseHeader(true).setReorderColumns(true).setColumnSeparator(',').
            
setEscapeChar('"').setLineSeparator(System.lineSeparator()).setQuoteChar('"').setArrayElementSeparator(";").
            setNullValue("");

    CsvReaderFormat<Object> csvFormat = 
CsvReaderFormat.forSchema(CsvSchema.builder().build(), 
TypeInformation.of(recordClazz));

    FileSource.FileSourceBuilder fileSourceBuilder = 
FileSource.forRecordStreamFormat(csvFormat, 
dataDirectory).monitorContinuously(Duration.ofSeconds(30));
    fileSourceBuilder.setFileEnumerator((FileEnumerator.Provider) () -> new 
NonSplittingRecursiveEnumerator(new DefaultFileFilter()));
    FileSource source = fileSourceBuilder.build();

    final DataStreamSource<EmployeeTest> file = env.fromSource(source, 
WatermarkStrategy.forMonotonousTimestamps()
            .withTimestampAssigner(new WatermarkAssigner((Object input) -> 
System.currentTimeMillis())),"FileSource");
    file.print();
}


Regards,
Kirti Dhar

From: Alexander Fedulov <alexander.fedu...@gmail.com>
Sent: 26 October 2023 20:59
To: Kirti Dhar Upadhyay K <kirti.k.dhar.upadh...@ericsson.com>
Cc: user@flink.apache.org
Subject: Re: CSV Decoder with AVRO schema generated Object

Hi Kirti,

What do you mean exactly by "Flink CSV Decoder"? Please provide a snippet of 
the code that you are trying to execute.

To be honest, combining CSV with AVRO-generated classes sounds rather strange 
and you might want to reconsider your approach.
As for a quick fix, using aliases in your reader schema might help [1]

[1] https://avro.apache.org/docs/1.8.1/spec.html#Aliases

Best,
Alexander Fedulov

On Thu, 26 Oct 2023 at 16:24, Kirti Dhar Upadhyay K via user 
<user@flink.apache.org<mailto:user@flink.apache.org>> wrote:
Hi Team,

I am using Flink CSV Decoder with AVSC generated java Object and facing issue 
if the field name contains underscore(_) or fieldname starts with Capital case.
Sample Schema:

{
  "namespace": "avro.employee",
  "type": "record",
  "name": "EmployeeTest",
  "fields": [
    {
      "name": "emp_id",
      "type": ["null","long"]
    },
    {
      "name": "Name",
      "type": ["null","string"]
    }
    ]
}

Generated Java Object getters/setters:



public void setEmpId(java.lang.Long value) {
  this.emp_id = value;
}



………………………………………………………………………………………………………….

………………………………………………………………………………………………………….



public java.lang.CharSequence getName() {
  return Name;
}


Input record:
emp_id,Name
1,peter

Exception:
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked as 
ignorable (2 known properties: "empId", "name"])

I have also found an old JIRA regarding this: 
https://issues.apache.org/jira/browse/FLINK-2874

Any help would be appreciated!

Regards,
Kirti Dhar




Reply via email to