Thanks, Arvide for detailed answers. - Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue). Yes, that can be written but that not solve the problem completely because I want to avoid job restart itself. Every time I restart I also have to restart from the last checkpoint in S3.
- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart. Again same reason as first also not sure how would it be implemented. Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs. -Please provide me some idea I want to implement this, how it can be possible. @Arvid Heise <ar...@ververica.com> jJuujIstdfdf I am just thinking can I use a broadcast state where this config rule which I keeping in YAML can be a push in Kafka itself. Because I just need event name and Avro schema subject mapping mainly. Please correct me if I am thinking in the wrong direction. On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Arvid, > > Would it be possible to implement a BucketAssigner that for example loads > the configuration periodically from an external source and according to the > event type decides on a different sub-folder? > > Thanks, > Rafi > > > On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Anuj, >> >> There is currently no way to dynamically change the topology. It would be >> good to know why your current approach is not working (restart taking too >> long? Too frequent changes?) >> >> So some ideas: >> - Have some kind submitter that restarts flink automatically on config >> change (assumes that restart time is not the issue). >> - Your job could periodically check for a change and then fail. However, >> you need to fail in a way that the topology is rebuilt. I guess it's close >> to a fatal error and then the driver handles that error and restart. >> - Rewrite the job in such a way that you have only one sink in your job >> graph which demultiplexes the event to several internal sinks. Then you >> could simply add a new sink whenever a new event occurs. >> >> The first option is the easiest and the last option the most versatile >> (could even have different sink types mixed). >> >> On Tue, Jun 23, 2020 at 5:34 AM aj <ajainje...@gmail.com> wrote: >> >>> I am stuck on this . Please give some suggestions. >>> >>> On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote: >>> >>>> please help with this. Any suggestions. >>>> >>>> On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote: >>>> >>>>> Hello All, >>>>> >>>>> I am receiving a set of events in Avro format on different topics. I >>>>> want to consume these and write to s3 in parquet format. >>>>> I have written a below job that creates a different stream for each >>>>> event and fetches it schema from the confluent schema registry to create a >>>>> parquet sink for an event. >>>>> This is working fine but the only problem I am facing is whenever a >>>>> new event start coming I have to change in the YAML config and restart the >>>>> job every time. Is there any way I do not have to restart the job and it >>>>> start consuming a new set of events. >>>>> >>>>> >>>>> YAML config : >>>>> >>>>> !com.bounce.config.EventTopologyConfig >>>>> eventsType: >>>>> - !com.bounce.config.EventConfig >>>>> event_name: "search_list_keyless" >>>>> schema_subject: >>>>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless" >>>>> topic: "search_list_keyless" >>>>> >>>>> - !com.bounce.config.EventConfig >>>>> event_name: "bike_search_details" >>>>> schema_subject: >>>>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details" >>>>> topic: "bike_search_details" >>>>> >>>>> - !com.bounce.config.EventConfig >>>>> event_name: "keyless_bike_lock" >>>>> schema_subject: >>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock" >>>>> topic: "analytics-keyless" >>>>> >>>>> - !com.bounce.config.EventConfig >>>>> event_name: "keyless_bike_unlock" >>>>> schema_subject: >>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock" >>>>> topic: "analytics-keyless" >>>>> >>>>> >>>>> checkPointInterval: 1200000 >>>>> >>>>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"] >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *Sink code :* >>>>> >>>>> YamlReader reader = new YamlReader(topologyConfig); >>>>> EventTopologyConfig eventTopologyConfig = >>>>> reader.read(EventTopologyConfig.class); >>>>> >>>>> long checkPointInterval = eventTopologyConfig.getCheckPointInterval(); >>>>> topics = eventTopologyConfig.getTopics(); >>>>> >>>>> List<EventConfig> eventTypesList = >>>>> eventTopologyConfig.getEventsType(); >>>>> >>>>> CachedSchemaRegistryClient registryClient = new >>>>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000); >>>>> >>>>> >>>>> FlinkKafkaConsumer flinkKafkaConsumer = new >>>>> FlinkKafkaConsumer(topics, >>>>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>> properties); >>>>> >>>>> DataStream<GenericRecord> dataStream = >>>>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); >>>>> >>>>> try { >>>>> for (EventConfig eventConfig : eventTypesList) { >>>>> >>>>> LOG.info("creating a stream for ", eventConfig.getEvent_name()); >>>>> >>>>> final StreamingFileSink<GenericRecord> sink = >>>>> StreamingFileSink.forBulkFormat >>>>> (path, >>>>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), >>>>> registryClient))) >>>>> .withBucketAssigner(new EventTimeBucketAssigner()) >>>>> .build(); >>>>> >>>>> DataStream<GenericRecord> outStream = >>>>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { >>>>> if (genericRecord != null && >>>>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) >>>>> { >>>>> return true; >>>>> } >>>>> return false; >>>>> }); >>>>> >>>>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); >>>>> >>>>> } >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks & Regards, >>>>> Anuj Jain >>>>> >>>>> >>>>> >>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>> >>>> >>>> >>>> -- >>>> Thanks & Regards, >>>> Anuj Jain >>>> Mob. : +91- 8588817877 >>>> Skype : anuj.jain07 >>>> <http://www.oracle.com/> >>>> >>>> >>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>> >>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>