Re: CsvSink

2018-03-22 Thread Fabian Hueske
> >>>>>>>> >>>>>>>> >>>>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream) >>>>>>>> val results = tableEnv.sqlQuery( """ >>>>>>>>

Re: CsvSink

2018-03-20 Thread karim amer
FROM CDRS >>>>>>>""".stripMargin) >>>>>>> >>>>>>> >>>>>>> val result: Table = results >>>>>>> >>>>>>> val path = "file:///Users/test/1.tx

Re: CsvSink

2018-03-20 Thread karim amer
t;>>> numFiles = 1, // optional: write to a single >>>>>> file >>>>>> writeMode = WriteMode.OVERWRITE) >>>>>> >>>>>> result.writeToSink(sink) >>>>

Re: CsvSink

2018-03-20 Thread karim amer
gt;>>>> >>>>> >>>>> env.execute("this job") >>>>> >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Mar 19, 2018 at 9:13

Re: CsvSink

2018-03-20 Thread karim amer
sv file popping up. Also, the job was >>>>> crashing when the file 1.txt already existed. >>>>> >>>>> The code I used (running Flink 1.5-SNAPSHOT): >>>>> >>>>> def main(args: Array[String]) { >>>>> // se

Re: CsvSink

2018-03-20 Thread karim amer
ring]) { >>>> // set up the streaming execution environment >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>> >>>> val stream: DataStream[(

Re: CsvSink

2018-03-20 Thread karim amer
; >>> val results = tableEnv.sqlQuery( """ >>>|SELECT >>> | A,C >>>| FROM mytable >>>

Re: CsvSink

2018-03-20 Thread Fabian Hueske
> val path = "file:///tmp/test/1.txt" >> val sink :TableSink[Row]= new CsvTableSink( >> path, // output path >> fieldDelim = "|", // optional: delimit files by '|' >> nu

Re: CsvSink

2018-03-19 Thread karim amer
gt; > env.execute("this job") > } > > def get3TupleDataStream(env: StreamExecutionEnvironment): > DataStream[(Int, Long, String)] = { > val data = new mutable.MutableList[(Int, Long, String)] > data.+=((1, 1L, "Hi")) > data.+=((2, 2L, "Hello

Re: CsvSink

2018-03-19 Thread Nico Kruber
how are you?")) data.+=((5, 3L, "I am fine.")) data.+=((6, 3L, "Luke Skywalker")) env.fromCollection(data) } Nico On 16/03/18 22:50, karim amer wrote: > Hi There, > >  I am trying to write a CSVsink to disk but it's not getting written. I

CsvSink

2018-03-16 Thread karim amer
Hi There, I am trying to write a CSVsink to disk but it's not getting written. I think the file is getting overwritten or truncated once The Stream process finishes. Does anyone know why the file is getting overwritten or truncated and how can i fix this ? tableEnv.registerDataStream(&