I think @brat is right, I didn’t know the Kafka property 'auto.create.topics.enable’ , you can pass the property to Kafka Producer, that should work. Best, Leonard Xu
> 在 2020年6月1日,18:33,satya brat <bratsatya...@gmail.com> 写道: > > Prasanna, > You might want to check the kafka broker configs where > 'auto.create.topics.enable' helps with creating a new topic whenever a new > message with non existent topic is published. > https://kafka.apache.org/documentation/#brokerconfigs > <https://kafka.apache.org/documentation/#brokerconfigs> > > I am not too sure about pitfalls if any. > > On Mon, Jun 1, 2020 at 3:20 PM Leonard Xu <xbjt...@gmail.com > <mailto:xbjt...@gmail.com>> wrote: > Hi, kumar > > Sorry for missed the original question, I think we can not create topic > dynamically current, creating topic should belong to control flow rather a > data flow, and user may has some custom configurations of the topic from my > understanding. Maybe you need implement the logic of check/create/manage > topic in your custom SinkFunction so that the topic can create dynamically in > runtime. > > Best, > Leonard Xu > >> 在 2020年6月1日,17:02,Prasanna kumar <prasannakumarram...@gmail.com >> <mailto:prasannakumarram...@gmail.com>> 写道: >> >> Leaonard, >> >> Thanks for the reply and would look into those options. >> But as for the original question, could we create a topic dynamically when >> required . >> >> Prasanna. >> >> On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu <xbjt...@gmail.com >> <mailto:xbjt...@gmail.com>> wrote: >> Hi, kumar >> >> Flink support consume/produce from/to multiple kafka topics[1], in your case >> you can implement KeyedSerializationSchema(legacy interface) or >> KafkaSerializationSchema[2] to make one producer instance support send data >> to multiple topics. There is an ITCase you can reference[3]. >> >> >> Best, >> Leonard Xu >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer >> >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer> >> [2]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java> >> >> [3]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126 >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126> >> >> >>> 在 2020年6月1日,15:35,Prasanna kumar <prasannakumarram...@gmail.com >>> <mailto:prasannakumarram...@gmail.com>> 写道: >>> >>> Hi, >>> >>> I have Use Case where i read events from a Single kafka Stream comprising >>> of JSON messages. >>> >>> Requirement is to split the stream into multiple output streams based on >>> some criteria say based on Type of Event or Based on Type and Customer >>> associated with the event. >>> >>> We could achieve the splitting of stream using Side outputs as i have seen >>> in the documentation. >>> >>> Our business environment is such that there could be new event types >>> flowing in and would the Flink Kafka producer create the topics dynamically >>> based on the inflowing events. I did not see any documentation saying that >>> it could create. >>> >>> Or should it be always pre created by running a script separately. (Not a >>> good scalable practice in our case) >>> >>> Thanks, >>> Prasanna. >> >