I'm trying to sink two Window Streams to the same Kinesis Sink. When I do this, no results are making it to the sink (code below). If I remove one of the windows from the Job, results do get published. Adding another stream to the sink seems to void both.
How can I have results from both Window Streams go to the same sink? Thanks public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ObjectMapper jsonParser = new ObjectMapper(); DataStream<String> inputStream = createKinesisSource(env); FlinkKinesisProducer<String> kinesisSink = createKinesisSink(); WindowedStream oneMinStream = inputStream .map(value -> jsonParser.readValue(value, JsonNode.class)) .keyBy(node -> node.get("accountId")) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))); oneMinStream .aggregate(new LoginAggregator("k1m")) .addSink(kinesisSink); WindowedStream twoMinStream = inputStream .map(value -> jsonParser.readValue(value, JsonNode.class)) .keyBy(node -> node.get("accountId")) .window(TumblingProcessingTimeWindows.of(Time.minutes(2))); twoMinStream .aggregate(new LoginAggregator("k2m")) .addSink(kinesisSink); try { env.execute("Flink Kinesis Streaming Sink Job"); } catch (Exception e) { LOG.error("failed"); LOG.error(e.getLocalizedMessage()); LOG.error(e.getStackTrace().toString()); throw e; } } private static DataStream<String> createKinesisSource(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static FlinkKinesisProducer<String> createKinesisSink() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition(UUID.randomUUID().toString()); return sink; }