Hello All,

I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a below job that creates a different stream for each event
and fetches it schema from the confluent schema registry to create a
parquet sink for an event.
This is working fine but the only problem I am facing is whenever a new
event start coming I have to change in the YAML config and restart the job
every time. Is there any way I do not have to restart the job and it start
consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
  - !com.bounce.config.EventConfig
    event_name: "search_list_keyless"
    schema_subject:
"search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
    topic: "search_list_keyless"

  - !com.bounce.config.EventConfig
    event_name: "bike_search_details"
    schema_subject:
"bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
    topic: "bike_search_details"

  - !com.bounce.config.EventConfig
    event_name: "keyless_bike_lock"
    schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
    topic: "analytics-keyless"

  - !com.bounce.config.EventConfig
      event_name: "keyless_bike_unlock"
      schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
      topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]





*Sink code :*

  YamlReader reader = new YamlReader(topologyConfig);
    EventTopologyConfig eventTopologyConfig =
reader.read(EventTopologyConfig.class);

    long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
        topics = eventTopologyConfig.getTopics();

                List<EventConfig> eventTypesList =
eventTopologyConfig.getEventsType();

        CachedSchemaRegistryClient registryClient = new
CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
        new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
        properties);

        DataStream<GenericRecord> dataStream =
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

        try {
        for (EventConfig eventConfig : eventTypesList) {

        LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
        (path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
registryClient)))
        .withBucketAssigner(new EventTimeBucketAssigner())
        .build();

        DataStream<GenericRecord> outStream =
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
        if (genericRecord != null &&
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
        return true;
        }
        return false;
        });
        
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

        }
        } catch (Exception e) {
        e.printStackTrace();
        }




-- 
Thanks & Regards,
Anuj Jain



<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to