When Flink is running on a cluster, `DataStream#print()` prints to files in
the log directory.

Regards,
David

On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <pankajchanda...@gmail.com>
wrote:

> Please correct me if I am wrong. `DataStream#print()` only prints to the
> screen when running from the IDE, but does not work (print to the screen)
> when running on a cluster (even a local cluster).
>
> Thanks,
>
> Pankaj
>
> On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Simone,
>>
>> I'd suggest trying out the `DataStream#print()` function to start, but
>> there are a few other easy-to-integrate sinks for testing that you can
>> check out in the docs here[1]
>>
>> Best,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks
>>
>> On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin <cavalla...@hotmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> On my code I have a DataStream that I would like to access. I need to
>>> understand what I'm getting for each transformation to check if the data
>>> that I'm working on make sense. How can I print into the console or get a
>>> file (csv, txt) for the variables: "stream", "enriched" and "result"?
>>>
>>> I have tried different way but no way to get the data.
>>>
>>> Thanks!
>>>
>>>
>>> *        FlinkKafkaConsumer<Event> kafkaData =*
>>> *                new FlinkKafkaConsumer("CorID_1", new
>>> EventDeserializationSchema(), p);*
>>> *        WatermarkStrategy<Event> wmStrategy =*
>>> *                WatermarkStrategy*
>>> *                        .<Event>forMonotonousTimestamps()*
>>> *                        .withIdleness(Duration.ofMinutes(1))*
>>> *                        .withTimestampAssigner((event, timestamp) -> {*
>>> *                            return event.get_Time();*
>>> *                        });*
>>> *        DataStream<Event> stream = env.addSource(*
>>> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>>>
>>> *        DataStream<Tuple2<Event, Long>> enriched = stream*
>>> *                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)*
>>> *                .map(new StatefulSessionCalculator());*
>>>
>>> *        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result
>>> = enriched*
>>> *                .keyBy(new MyKeySelector())*
>>> *                .window(EventTimeSessionWindows.withDynamicGap(new
>>> DynamicSessionWindows()));*
>>>
>>

Reply via email to