Hi Anuj, is that piece of code in your first mail in the same main? Then at this point, nothing has been executed in Flink.
So we are looking at a normal Java programming error that you can easily debug and unit test. Most likely, there is no event config for *SEARCH_LIST_KEYLESS* or there is a spelling error. On Thu, Feb 27, 2020 at 9:23 AM aj <ajainje...@gmail.com> wrote: > Hi Khachatryan, > > This is the use case to create multiple streams: > > I have a use case where multiple types of Avro records are coming in > single Kafka topic as we are suing TopicRecordNameStrategy for the subject > in the schema registry. Now I have written a consumer to read that topic > and build a Datastream of GenericRecord. Now I can not sink this stream to > hdfs/s3 in parquet format as this stream contains different types of schema > records. So I am filtering different records for each type by applying a > filter and creating different streams and then sinking each stream > separately. > > So can you please help me create multiple dynamic streams with the code > that I shared. How to resolve this issue? > > On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman < > khachatryan.ro...@gmail.com> wrote: > >> As I understand from code, streamMap is a Java map, not Scala. So you can >> get NPE while unreferencing the value you got from it. >> >> Also, the approach looks a bit strange. >> Can you describe what are you trying to achieve? >> >> Regards, >> Roman >> >> >> On Mon, Feb 24, 2020 at 5:47 PM aj <ajainje...@gmail.com> wrote: >> >>> >>> I am trying below piece of code to create multiple datastreams object >>> and store in map. >>> >>> for (EventConfig eventConfig : eventTypesList) { >>> LOGGER.info("creating a stream for ", >>> eventConfig.getEvent_name()); >>> String key = eventConfig.getEvent_name(); >>> final StreamingFileSink<GenericRecord> sink = >>> StreamingFileSink.forBulkFormat >>> (path, >>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject()))) >>> .withBucketAssigner(new EventTimeBucketAssigner()) >>> .build(); >>> >>> DataStream<GenericRecord> stream = >>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { >>> if >>> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) >>> { >>> return true; >>> } >>> return false; >>> }); >>> >>> Tuple2<DataStream<GenericRecord>, >>> StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink); >>> streamMap.put(key, tuple2); >>> } >>> >>> DataStream<GenericRecord> searchStream = >>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0); >>> searchStream.map(new >>> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1)); >>> >>> >>> I am getting Nullpointer Exception when trying to get the stream from >>> map value at : >>> >>> >>> *DataStream<GenericRecord> searchStream = >>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);* >>> >>> As per my understanding, this is due to the map is local to main and not >>> broadcast to tasks. >>> If I want to do this how should I do, please help me to resolve this? >>> >>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >