>
>>>>>>>>
>>>>>>>>
>>>>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>>> val results = tableEnv.sqlQuery( """
>>>>>>>>
FROM CDRS
>>>>>>>""".stripMargin)
>>>>>>>
>>>>>>>
>>>>>>> val result: Table = results
>>>>>>>
>>>>>>> val path = "file:///Users/test/1.tx
t;>>> numFiles = 1, // optional: write to a single
>>>>>> file
>>>>>> writeMode = WriteMode.OVERWRITE)
>>>>>>
>>>>>> result.writeToSink(sink)
>>>>
gt;>>>>
>>>>>
>>>>> env.execute("this job")
>>>>>
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 19, 2018 at 9:13
sv file popping up. Also, the job was
>>>>> crashing when the file 1.txt already existed.
>>>>>
>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>
>>>>> def main(args: Array[String]) {
>>>>> // se
ring]) {
>>>> // set up the streaming execution environment
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>> val stream: DataStream[(
;
>>> val results = tableEnv.sqlQuery( """
>>>|SELECT
>>> | A,C
>>>| FROM mytable
>>>
> val path = "file:///tmp/test/1.txt"
>> val sink :TableSink[Row]= new CsvTableSink(
>> path, // output path
>> fieldDelim = "|", // optional: delimit files by '|'
>> nu
gt;
> env.execute("this job")
> }
>
> def get3TupleDataStream(env: StreamExecutionEnvironment):
> DataStream[(Int, Long, String)] = {
> val data = new mutable.MutableList[(Int, Long, String)]
> data.+=((1, 1L, "Hi"))
> data.+=((2, 2L, "Hello
how are you?"))
data.+=((5, 3L, "I am fine."))
data.+=((6, 3L, "Luke Skywalker"))
env.fromCollection(data)
}
Nico
On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
> I am trying to write a CSVsink to disk but it's not getting written. I
Hi There,
I am trying to write a CSVsink to disk but it's not getting written. I
think the file is getting overwritten or truncated once The Stream process
finishes. Does anyone know why the file is getting overwritten or truncated
and how can i fix this ?
tableEnv.registerDataStream(&
11 matches
Mail list logo