Hi Shameet,

The reason should be that it adds quotes around string data according to
the length by default. You could disable the quotes using option
csv-disable-quote-character [1]. However, there are still no options to
configure it to always add quotes around string data. If that's your
requirement, you could create a JIRA.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/csv/#csv-disable-quote-character

On Sun, Apr 10, 2022 at 4:53 AM Shameet Doshi <
shameet.do...@firstperformance.com> wrote:

> Hello,
>
> I am using the table api in pyflink to generate a csv . What i noticed is
> that its conditionally adding quotes around the data. What I want is quotes
> around all the data
> csv is being created in s3
>
> e.g in output below the data in last column was not quoted
> "transaction_idgeorge.bl...@reqres.in",card_hash,transaction_id
> "no3500957594177george.bl...@reqres.in
> ","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177
> "no3500957594178george.bl...@reqres.in
> ","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178
>
> code
>
> def create_source_table(table_name, input_path):
> return """ CREATE TABLE {0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100)
>
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '{1}'
> ) """.format(
> table_name, input_path)
>
> def create_sink_table(table_name, bucket_name):
> return """ CREATE TABLE {0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100),
> brand_id VARCHAR(100)
> )
> with (
> 'connector'='filesystem',
> 'path'='{1}',
> 'format'='csv'
> ) """.format(
> table_name, bucket_name)
>
>
> # 2. Creates a source table from a Kinesis Data Stream
> table_env.execute_sql(
> create_source_table(
> input_table_name, input_file
> )
> )
>
> table_env.execute_sql(
> create_sink_table(
> out_table_name, output_bucket_name
> )
> )
>
> table_env.register_function("addme1", addme1)
>
>
> source_table = table_env.from_path(input_table_name)
> source_table.select(addme1(source_table.transaction_id),source_table.card_hash,
> source_table.transaction_id.alias('brand_id')).execute_insert(
> out_table_name).wait()
>
>
> apache-flink version - 1.13
> python 3.8
>
> thanks
>
> shameet
>
>
>
>
> NOTICE: This communication may contain information which is confidential
> to First Performance Corporation (FPC). If you are not the intended
> recipient of this communication, please delete this email, destroy all
> copies, and alert the sender. If you are the intended recipient of this
> communication, you should not copy, disclose or distribute this
> communication without the authority of FPC. Any views expressed in this
> communication are those of the individual sender, except where the sender
> specifically states them to be the views of FPC. Except as required by law,
> FPC does not represent, warrant or guarantee that the integrity of this
> communication has been maintained nor that the communication is free of
> errors, harmful code, interception or interference.

Reply via email to