Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
> 
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
> 
> 
> tableEnv.registerDataStream("table", watermarkedStream)
> 
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
> 
> 
> 
> val result: Table = results
> 
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
> 
> result.writeToSink(sink)
> 
> env.execute("this job")
> 
> 
> 
> 
> Thanks

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to