I am trying below piece of code to create multiple datastreams object and
store in map.
for (EventConfig eventConfig : eventTypesList) {
LOGGER.info("creating a stream for ",
eventConfig.getEvent_name());
String key = eventConfig.getEvent_name();
final StreamingFileSink<GenericRecord> sink =
StreamingFileSink.forBulkFormat
(path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
DataStream<GenericRecord> stream =
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if
(genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});
Tuple2<DataStream<GenericRecord>,
StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
streamMap.put(key, tuple2);
}
DataStream<GenericRecord> searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
searchStream.map(new
Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
I am getting Nullpointer Exception when trying to get the stream from map
value at :
*DataStream<GenericRecord> searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
As per my understanding, this is due to the map is local to main and not
broadcast to tasks.
If I want to do this how should I do, please help me to resolve this?
--
Thanks & Regards,
Anuj Jain
<http://www.cse.iitm.ac.in/%7Eanujjain/>