Hi Shameet, The reason should be that it adds quotes around string data according to the length by default. You could disable the quotes using option csv-disable-quote-character [1]. However, there are still no options to configure it to always add quotes around string data. If that's your requirement, you could create a JIRA.
Regards, Dian [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/csv/#csv-disable-quote-character On Sun, Apr 10, 2022 at 4:53 AM Shameet Doshi < shameet.do...@firstperformance.com> wrote: > Hello, > > I am using the table api in pyflink to generate a csv . What i noticed is > that its conditionally adding quotes around the data. What I want is quotes > around all the data > csv is being created in s3 > > e.g in output below the data in last column was not quoted > "transaction_idgeorge.bl...@reqres.in",card_hash,transaction_id > "no3500957594177george.bl...@reqres.in > ","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177 > "no3500957594178george.bl...@reqres.in > ","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178 > > code > > def create_source_table(table_name, input_path): > return """ CREATE TABLE {0} ( > transaction_id VARCHAR(100), > card_hash VARCHAR(100) > > ) with ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '{1}' > ) """.format( > table_name, input_path) > > def create_sink_table(table_name, bucket_name): > return """ CREATE TABLE {0} ( > transaction_id VARCHAR(100), > card_hash VARCHAR(100), > brand_id VARCHAR(100) > ) > with ( > 'connector'='filesystem', > 'path'='{1}', > 'format'='csv' > ) """.format( > table_name, bucket_name) > > > # 2. Creates a source table from a Kinesis Data Stream > table_env.execute_sql( > create_source_table( > input_table_name, input_file > ) > ) > > table_env.execute_sql( > create_sink_table( > out_table_name, output_bucket_name > ) > ) > > table_env.register_function("addme1", addme1) > > > source_table = table_env.from_path(input_table_name) > source_table.select(addme1(source_table.transaction_id),source_table.card_hash, > source_table.transaction_id.alias('brand_id')).execute_insert( > out_table_name).wait() > > > apache-flink version - 1.13 > python 3.8 > > thanks > > shameet > > > > > NOTICE: This communication may contain information which is confidential > to First Performance Corporation (FPC). If you are not the intended > recipient of this communication, please delete this email, destroy all > copies, and alert the sender. If you are the intended recipient of this > communication, you should not copy, disclose or distribute this > communication without the authority of FPC. Any views expressed in this > communication are those of the individual sender, except where the sender > specifically states them to be the views of FPC. Except as required by law, > FPC does not represent, warrant or guarantee that the integrity of this > communication has been maintained nor that the communication is free of > errors, harmful code, interception or interference.