As I understand from code, streamMap is a Java map, not Scala. So you can get NPE while unreferencing the value you got from it.
Also, the approach looks a bit strange. Can you describe what are you trying to achieve? Regards, Roman On Mon, Feb 24, 2020 at 5:47 PM aj <ajainje...@gmail.com> wrote: > > I am trying below piece of code to create multiple datastreams object and > store in map. > > for (EventConfig eventConfig : eventTypesList) { > LOGGER.info("creating a stream for ", > eventConfig.getEvent_name()); > String key = eventConfig.getEvent_name(); > final StreamingFileSink<GenericRecord> sink = > StreamingFileSink.forBulkFormat > (path, > ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject()))) > .withBucketAssigner(new EventTimeBucketAssigner()) > .build(); > > DataStream<GenericRecord> stream = > dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { > if > (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) > { > return true; > } > return false; > }); > > Tuple2<DataStream<GenericRecord>, > StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink); > streamMap.put(key, tuple2); > } > > DataStream<GenericRecord> searchStream = > streamMap.get(SEARCH_LIST_KEYLESS).getField(0); > searchStream.map(new > Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1)); > > > I am getting Nullpointer Exception when trying to get the stream from map > value at : > > > *DataStream<GenericRecord> searchStream = > streamMap.get(SEARCH_LIST_KEYLESS).getField(0);* > > As per my understanding, this is due to the map is local to main and not > broadcast to tasks. > If I want to do this how should I do, please help me to resolve this? > > > > -- > Thanks & Regards, > Anuj Jain > > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >