> On Aug 12, 2021, at 11:31 AM, Megha Gandhi <g12me...@gmail.com> wrote:
>
> Hi
> This is my flink app code
> DataStream<String> input = createSourceFromStaticConfig(env);
> // Map it to a json
> ObjectMapper jsonParser = new ObjectMapper();
> DataStream<Tuple3<String, Double, Long>> inputStream = input.map(value -> {
> // Parse the JSON
> JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
> return new Tuple3<>(jsonNode.get("TICKER").asText(),
> jsonNode.get("PRICE").asDouble(), jsonNode.get("EVENT_TIME").asLong());
> }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.LONG));
>
> SingleOutputStreamOperator<String> fiveMinSum =
> inputStream.assignTimestampsAndWatermarks(new TimestampExtractor())
> .keyBy(t -> t.f0) // Logically partition the stream per stock symbol
> .window(SlidingEventTimeWindows.of(Time.minutes(3), Time.minutes(1)))
> // Sliding window definition
> .process(new MyProcessFunction());
> fiveMinSum.print();
> fiveMinSum.addSink(createS3SinkFromStaticConfig());
> env.execute("Flink Streaming Java API Skeleton");
>
>
> On execution in flink dashboard, I see data coming in but records are not
> being sent. Even though I added print as a sink, I do not see any messages in
> stdout in flink task manager console. How can I debug this?
> There are no exceptions as well in the dashboard.
>
> Any inputs on how to solve this?