Hi Arvid, Would it be possible to implement a BucketAssigner that for example loads the configuration periodically from an external source and according to the event type decides on a different sub-folder?
Thanks, Rafi On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Anuj, > > There is currently no way to dynamically change the topology. It would be > good to know why your current approach is not working (restart taking too > long? Too frequent changes?) > > So some ideas: > - Have some kind submitter that restarts flink automatically on config > change (assumes that restart time is not the issue). > - Your job could periodically check for a change and then fail. However, > you need to fail in a way that the topology is rebuilt. I guess it's close > to a fatal error and then the driver handles that error and restart. > - Rewrite the job in such a way that you have only one sink in your job > graph which demultiplexes the event to several internal sinks. Then you > could simply add a new sink whenever a new event occurs. > > The first option is the easiest and the last option the most versatile > (could even have different sink types mixed). > > On Tue, Jun 23, 2020 at 5:34 AM aj <ajainje...@gmail.com> wrote: > >> I am stuck on this . Please give some suggestions. >> >> On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote: >> >>> please help with this. Any suggestions. >>> >>> On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote: >>> >>>> Hello All, >>>> >>>> I am receiving a set of events in Avro format on different topics. I >>>> want to consume these and write to s3 in parquet format. >>>> I have written a below job that creates a different stream for each >>>> event and fetches it schema from the confluent schema registry to create a >>>> parquet sink for an event. >>>> This is working fine but the only problem I am facing is whenever a new >>>> event start coming I have to change in the YAML config and restart the job >>>> every time. Is there any way I do not have to restart the job and it start >>>> consuming a new set of events. >>>> >>>> >>>> YAML config : >>>> >>>> !com.bounce.config.EventTopologyConfig >>>> eventsType: >>>> - !com.bounce.config.EventConfig >>>> event_name: "search_list_keyless" >>>> schema_subject: >>>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless" >>>> topic: "search_list_keyless" >>>> >>>> - !com.bounce.config.EventConfig >>>> event_name: "bike_search_details" >>>> schema_subject: >>>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details" >>>> topic: "bike_search_details" >>>> >>>> - !com.bounce.config.EventConfig >>>> event_name: "keyless_bike_lock" >>>> schema_subject: >>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock" >>>> topic: "analytics-keyless" >>>> >>>> - !com.bounce.config.EventConfig >>>> event_name: "keyless_bike_unlock" >>>> schema_subject: >>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock" >>>> topic: "analytics-keyless" >>>> >>>> >>>> checkPointInterval: 1200000 >>>> >>>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"] >>>> >>>> >>>> >>>> >>>> >>>> *Sink code :* >>>> >>>> YamlReader reader = new YamlReader(topologyConfig); >>>> EventTopologyConfig eventTopologyConfig = >>>> reader.read(EventTopologyConfig.class); >>>> >>>> long checkPointInterval = eventTopologyConfig.getCheckPointInterval(); >>>> topics = eventTopologyConfig.getTopics(); >>>> >>>> List<EventConfig> eventTypesList = >>>> eventTopologyConfig.getEventsType(); >>>> >>>> CachedSchemaRegistryClient registryClient = new >>>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000); >>>> >>>> >>>> FlinkKafkaConsumer flinkKafkaConsumer = new >>>> FlinkKafkaConsumer(topics, >>>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>> properties); >>>> >>>> DataStream<GenericRecord> dataStream = >>>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); >>>> >>>> try { >>>> for (EventConfig eventConfig : eventTypesList) { >>>> >>>> LOG.info("creating a stream for ", eventConfig.getEvent_name()); >>>> >>>> final StreamingFileSink<GenericRecord> sink = >>>> StreamingFileSink.forBulkFormat >>>> (path, >>>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), >>>> registryClient))) >>>> .withBucketAssigner(new EventTimeBucketAssigner()) >>>> .build(); >>>> >>>> DataStream<GenericRecord> outStream = >>>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { >>>> if (genericRecord != null && >>>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) >>>> { >>>> return true; >>>> } >>>> return false; >>>> }); >>>> >>>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); >>>> >>>> } >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> >>>> >>>> >>>> >>>> -- >>>> Thanks & Regards, >>>> Anuj Jain >>>> >>>> >>>> >>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>> >>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> Mob. : +91- 8588817877 >>> Skype : anuj.jain07 >>> <http://www.oracle.com/> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >