I am trying below piece of code to create multiple datastreams object and
store in map.

for (EventConfig eventConfig : eventTypesList) {
            LOGGER.info("creating a stream for ",
eventConfig.getEvent_name());
            String key = eventConfig.getEvent_name();
            final StreamingFileSink<GenericRecord> sink =
StreamingFileSink.forBulkFormat
                    (path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            DataStream<GenericRecord> stream =
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if
(genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
                    return true;
                }
                return false;
            });

            Tuple2<DataStream<GenericRecord>,
StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
            streamMap.put(key, tuple2);
        }

        DataStream<GenericRecord> searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
        searchStream.map(new
Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map
value at :


*DataStream<GenericRecord> searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*

As per my understanding, this is due to the map is local to main and not
broadcast to tasks.
If I want to do this how should I do, please help me to resolve this?



-- 
Thanks & Regards,
Anuj Jain



<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to