As I understand from code, streamMap is a Java map, not Scala. So you can
get NPE while unreferencing the value you got from it.

Also, the approach looks a bit strange.
Can you describe what are you trying to achieve?

Regards,
Roman


On Mon, Feb 24, 2020 at 5:47 PM aj <ajainje...@gmail.com> wrote:

>
> I am trying below piece of code to create multiple datastreams object and
> store in map.
>
> for (EventConfig eventConfig : eventTypesList) {
>             LOGGER.info("creating a stream for ",
> eventConfig.getEvent_name());
>             String key = eventConfig.getEvent_name();
>             final StreamingFileSink<GenericRecord> sink =
> StreamingFileSink.forBulkFormat
>                     (path,
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
>                     .withBucketAssigner(new EventTimeBucketAssigner())
>                     .build();
>
>             DataStream<GenericRecord> stream =
> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>                 if
> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
> {
>                     return true;
>                 }
>                 return false;
>             });
>
>             Tuple2<DataStream<GenericRecord>,
> StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
>             streamMap.put(key, tuple2);
>         }
>
>         DataStream<GenericRecord> searchStream =
> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
>         searchStream.map(new
> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
>
>
> I am getting Nullpointer Exception when trying to get the stream from map
> value at :
>
>
> *DataStream<GenericRecord> searchStream =
> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
>
> As per my understanding, this is due to the map is local to main and not
> broadcast to tasks.
> If I want to do this how should I do, please help me to resolve this?
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to