> On Aug 12, 2021, at 11:31 AM, Megha Gandhi <g12me...@gmail.com> wrote:
> 
> Hi 
> This is my flink app code 
> DataStream<String> input = createSourceFromStaticConfig(env);
> // Map it to a json
> ObjectMapper jsonParser = new ObjectMapper();
> DataStream<Tuple3<String, Double, Long>> inputStream = input.map(value -> { 
> // Parse the JSON
>             JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
>             return new Tuple3<>(jsonNode.get("TICKER").asText(), 
> jsonNode.get("PRICE").asDouble(), jsonNode.get("EVENT_TIME").asLong());
>         }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.LONG));
> 
> SingleOutputStreamOperator<String> fiveMinSum = 
> inputStream.assignTimestampsAndWatermarks(new TimestampExtractor())
>         .keyBy(t -> t.f0) // Logically partition the stream per stock symbol
>         .window(SlidingEventTimeWindows.of(Time.minutes(3), Time.minutes(1))) 
> // Sliding window definition
>         .process(new MyProcessFunction());
> fiveMinSum.print();
> fiveMinSum.addSink(createS3SinkFromStaticConfig());
> env.execute("Flink Streaming Java API Skeleton");
> 
> 
> On execution in flink dashboard, I see data coming in but records are not 
> being sent. Even though I added print as a sink, I do not see any messages in 
> stdout in flink task manager console. How can I debug this?
> There are no exceptions as well in the dashboard.
> 
> Any inputs on how to solve this?

Reply via email to