Hi Simone,

if you are just executing DataStream pipelines locally in your IDE while prototyping. You should be able to use `DataStream#print()` which just prints to standard out [1] (It might be hidden between the log messages).

For debugging locally, you can also just set breakpoints in your functions like in `StatefulSessionCalculator` and use the debugging mode of your IDE.

Regards,
Timo

[1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java


On 24.11.20 11:09, Simone Cavallarin wrote:
I tried to `DataStream#print()` but I don't quite understand how to implement it. Could you please give me an example? I'm using Intellij so what I would need is just to see the data on my screen.

Thanks

------------------------------------------------------------------------
*From:* David Anderson <da...@alpinegizmo.com>
*Sent:* 24 November 2020 10:01
*To:* Pankaj Chand <pankajchanda...@gmail.com>
*Cc:* Austin Cawley-Edwards <austin.caw...@gmail.com>; Simone Cavallarin <cavalla...@hotmail.com>; user@flink.apache.org <user@flink.apache.org>
*Subject:* Re: Print on screen DataStream content
When Flink is running on a cluster, `DataStream#print()` prints to files in the log directory.

Regards,
David

On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <pankajchanda...@gmail.com <mailto:pankajchanda...@gmail.com>> wrote:

    Please correct me if I am wrong. `DataStream#print()` only prints to
    the screen when running from the IDE, but does not work (print to
    the screen) when running on a cluster (even a local cluster).

    Thanks,

    Pankaj

    On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards
    <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>> wrote:

        Hey Simone,

        I'd suggest trying out the `DataStream#print()` function to
        start, but there are a few other easy-to-integrate sinks for
        testing that you can check out in the docs here[1]

        Best,
        Austin

        [1]:
        
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks
        
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&data=04%7C01%7C%7C27267479d99245bad55408d8905fea55%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418088905500774%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=9YFLY5slrok3wXJ5n1F3s0BNYuzNs%2F70RLWLsWJCkzE%3D&reserved=0>

        On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin
        <cavalla...@hotmail.com <mailto:cavalla...@hotmail.com>> wrote:

            Hi All,

            On my code I have a DataStream that I would like to access.
            I need to understand what I'm getting for each
            transformation to check if the data that I'm working on make
            sense. How can I print into the console or get a file (csv,
            txt) for the variables: "stream", "enriched" and "result"?

            I have tried different way but no way to get the data.

            Thanks!
            /
            /
            /
            /
            /        FlinkKafkaConsumer<Event> kafkaData =/
            /                new FlinkKafkaConsumer("CorID_1", new
            EventDeserializationSchema(), p);/
            /        WatermarkStrategy<Event> wmStrategy =/
            /                WatermarkStrategy/
            /                        .<Event>forMonotonousTimestamps()/
            /                        .withIdleness(Duration.ofMinutes(1))/
            /                        .withTimestampAssigner((event,
            timestamp) -> {/
            /                            return event.get_Time();/
            /                        });/
            /        DataStream<Event> *stream *= env.addSource(/
/ kafkaData.assignTimestampsAndWatermarks(wmStrategy));/
            /
            /
            /        DataStream<Tuple2<Event, Long>> *enriched* = stream/
            /                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)/
            /                .map(new StatefulSessionCalculator());/
            /
            /
            /        WindowedStream<Tuple2<Event, Long>, String,
            TimeWindow> *result* = enriched/
            /                .keyBy(new MyKeySelector())/
/ .window(EventTimeSessionWindows.withDynamicGap(new
            DynamicSessionWindows()));/


Reply via email to