I am trying below piece of code to create multiple datastreams object and store in map.
for (EventConfig eventConfig : eventTypesList) { LOGGER.info("creating a stream for ", eventConfig.getEvent_name()); String key = eventConfig.getEvent_name(); final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject()))) .withBucketAssigner(new EventTimeBucketAssigner()) .build(); DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) { return true; } return false; }); Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink); streamMap.put(key, tuple2); } DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0); searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1)); I am getting Nullpointer Exception when trying to get the stream from map value at : *DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);* As per my understanding, this is due to the map is local to main and not broadcast to tasks. If I want to do this how should I do, please help me to resolve this? -- Thanks & Regards, Anuj Jain <http://www.cse.iitm.ac.in/%7Eanujjain/>