Hi Karim, when I was trying to reproduce your code, I got an exception with the name 'table' being used - by replacing it and completing the job with some input, I did see the csv file popping up. Also, the job was crashing when the file 1.txt already existed.
The code I used (running Flink 1.5-SNAPSHOT): def main(args: Array[String]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env) .assignAscendingTimestamps(_._2) tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) val results = tableEnv.sqlQuery( """ |SELECT | A,C | FROM mytable """.stripMargin) val result: Table = results val path = "file:///tmp/test/1.txt" val sink :TableSink[Row]= new CsvTableSink( path, // output path fieldDelim = "|", // optional: delimit files by '|' numFiles = 1, // optional: write to a single file writeMode = WriteMode.NO_OVERWRITE) result.writeToSink(sink) env.execute("this job") } def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = { val data = new mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, "Hi")) data.+=((2, 2L, "Hello")) data.+=((3, 2L, "Hello world")) data.+=((4, 3L, "Hello world, how are you?")) data.+=((5, 3L, "I am fine.")) data.+=((6, 3L, "Luke Skywalker")) env.fromCollection(data) } Nico On 16/03/18 22:50, karim amer wrote: > Hi There, > > I am trying to write a CSVsink to disk but it's not getting written. I > think the file is getting overwritten or truncated once The Stream > process finishes. Does anyone know why the file is getting overwritten > or truncated and how can i fix this ? > > > tableEnv.registerDataStream("table", watermarkedStream) > > val results = tableEnv.sqlQuery( """ > |SELECT > | A > | FROM table > """.stripMargin) > > > > val result: Table = results > > val path = "file:///path/test/1.txt" > val sink :TableSink[Row]= new CsvTableSink( > path, // output path > fieldDelim = "|", // optional: delimit files by '|' > numFiles = 1, // optional: write to a single file > writeMode = WriteMode.NO_OVERWRITE) > > result.writeToSink(sink) > > env.execute("this job") > > > > > Thanks
signature.asc
Description: OpenPGP digital signature