Hi Arvid,

Would it be possible to implement a BucketAssigner that for example loads
the configuration periodically from an external source and according to the
event type decides on a different sub-folder?

Thanks,
Rafi


On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> There is currently no way to dynamically change the topology. It would be
> good to know why your current approach is not working (restart taking too
> long? Too frequent changes?)
>
> So some ideas:
> - Have some kind submitter that restarts flink automatically on config
> change (assumes that restart time is not the issue).
> - Your job could periodically check for a change and then fail. However,
> you need to fail in a way that the topology is rebuilt. I guess it's close
> to a fatal error and then the driver handles that error and restart.
> - Rewrite the job in such a way that you have only one sink in your job
> graph which demultiplexes the event to several internal sinks. Then you
> could simply add a new sink whenever a new event occurs.
>
> The first option is the easiest and the last option the most versatile
> (could even have different sink types mixed).
>
> On Tue, Jun 23, 2020 at 5:34 AM aj <ajainje...@gmail.com> wrote:
>
>> I am stuck on this . Please give some suggestions.
>>
>> On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote:
>>
>>> please help with this. Any suggestions.
>>>
>>> On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>>
>>>> I am receiving a set of events in Avro format on different topics. I
>>>> want to consume these and write to s3 in parquet format.
>>>> I have written a below job that creates a different stream for each
>>>> event and fetches it schema from the confluent schema registry to create a
>>>> parquet sink for an event.
>>>> This is working fine but the only problem I am facing is whenever a new
>>>> event start coming I have to change in the YAML config and restart the job
>>>> every time. Is there any way I do not have to restart the job and it start
>>>> consuming a new set of events.
>>>>
>>>>
>>>> YAML config :
>>>>
>>>> !com.bounce.config.EventTopologyConfig
>>>> eventsType:
>>>>   - !com.bounce.config.EventConfig
>>>>     event_name: "search_list_keyless"
>>>>     schema_subject: 
>>>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>>>>     topic: "search_list_keyless"
>>>>
>>>>   - !com.bounce.config.EventConfig
>>>>     event_name: "bike_search_details"
>>>>     schema_subject: 
>>>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>>>>     topic: "bike_search_details"
>>>>
>>>>   - !com.bounce.config.EventConfig
>>>>     event_name: "keyless_bike_lock"
>>>>     schema_subject: 
>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>>>>     topic: "analytics-keyless"
>>>>
>>>>   - !com.bounce.config.EventConfig
>>>>       event_name: "keyless_bike_unlock"
>>>>       schema_subject: 
>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>>>>       topic: "analytics-keyless"
>>>>
>>>>
>>>> checkPointInterval: 1200000
>>>>
>>>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *Sink code :*
>>>>
>>>>   YamlReader reader = new YamlReader(topologyConfig);
>>>>     EventTopologyConfig eventTopologyConfig = 
>>>> reader.read(EventTopologyConfig.class);
>>>>
>>>>     long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
>>>>         topics = eventTopologyConfig.getTopics();
>>>>
>>>>                 List<EventConfig> eventTypesList = 
>>>> eventTopologyConfig.getEventsType();
>>>>
>>>>         CachedSchemaRegistryClient registryClient = new 
>>>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>>>>
>>>>
>>>>         FlinkKafkaConsumer flinkKafkaConsumer = new 
>>>> FlinkKafkaConsumer(topics,
>>>>         new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>         properties);
>>>>
>>>>         DataStream<GenericRecord> dataStream = 
>>>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>>>>
>>>>         try {
>>>>         for (EventConfig eventConfig : eventTypesList) {
>>>>
>>>>         LOG.info("creating a stream for ", eventConfig.getEvent_name());
>>>>
>>>> final StreamingFileSink<GenericRecord> sink = 
>>>> StreamingFileSink.forBulkFormat
>>>>         (path, 
>>>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>>>>  registryClient)))
>>>>         .withBucketAssigner(new EventTimeBucketAssigner())
>>>>         .build();
>>>>
>>>>         DataStream<GenericRecord> outStream = 
>>>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>>>>         if (genericRecord != null && 
>>>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>>>  {
>>>>         return true;
>>>>         }
>>>>         return false;
>>>>         });
>>>>         
>>>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>>>>
>>>>         }
>>>>         } catch (Exception e) {
>>>>         e.printStackTrace();
>>>>         }
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to