Hi Curt,

Upon rechecking the code, you actually don't set the topic through
KafkaContextAware but just directly on the KafkaRecord returned by the
KafkaSerializationSchema.

Sorry for the confusion

Arvid

On Thu, Jun 24, 2021 at 8:36 AM Arvid Heise <ar...@apache.org> wrote:

> Hi,
>
> getTargetTopic can really be used as Curt needs it. So it would be good to
> add to PyFlink as well.
>
> However, I'm a bit skeptical that Kafka can really handle that model well.
> It's usually encouraged to use rather fewer, larger topics and you'd rather
> use partitions here instead of topics. There is a huge overhead on managing
> topics and even some hard limits that can be reached quicker than it
> initially appears.
> Of course, if you want to use ACLs, afaik you cannot define them on
> partitions, so you probably do not have any choice but to do it your way
> (or consider an alternative architecture).
> It's probably also fine if you just have THE one table that you need to
> divide, but it's not a pattern that scales to more tables or to an
> ever-growing number of tenants. Partitions scale much much better.
>
> Also from painful past experiences, you should absolutely create all
> topics beforehand on your production env and really use getTargetTopic just
> for routing.
>
> On Thu, Jun 24, 2021 at 8:10 AM Dian Fu <dian0511...@gmail.com> wrote:
>
>> OK, got it. Then it seems that split streams is also not quite suitable
>> to address your requirements as you still need to iterate over each of the
>> side output corresponding to each topic.
>>
>> Regarding to getTargetTopic [1],  per my understanding, it’s not designed
>> to dynamically assign the topic for an element according to the
>> documentation. However, I’m not 100% sure about this and maybe I missed
>> something.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java#L48
>>
>> Regards,
>> Dian
>>
>> 2021年6月24日 上午11:45,Curt Buechter <tricksho...@gmail.com> 写道:
>>
>> Hi Dian,
>> Thanks for the reply.
>> I don't think a filter function makes sense here. I have 2,000 tenants in
>> the source database, and I want all records for a single tenant in a
>> tenant-specific topic. So, with a filter function, if I understand it
>> correctly, I would need 2,000 different filters, which isn't very practical.
>>
>> An example:
>> source_topic(tenant_id, first_name, last_name)
>>
>> destination:
>> tenant1.sink_topic (first_name, last_name)
>> tenant2.sink_topic (first_name, last_name)
>> ...
>> tenant2000.sink_topic (first_name, last_name)
>>
>> On 2021/06/24 03:18:36, Dian Fu <d...@gmail.com> wrote:
>> > You are right that split is still not supported. Does it make sense for
>> you to split the stream using a filter function? There is some overhead
>> compared the built-in stream.split as you need to provide a filter function
>> for each sub-stream and so a record will evaluated multiple times.>
>> >
>> > > 2021年6月24日 上午3:08,Curt Buechter <tr...@gmail.com> 写道:>
>> > > >
>> > > Hi,>
>> > > New PyFlink user here. Loving it so far. The first major problem I've
>> run into is that I cannot create a Kafka Producer with dynamic topics. I
>> see that this has been available for quite some time in Java with Keyed
>> Serialization using the getTargetTopic method. Another way to do this in
>> Java may be with stream.split(), and adding a different sink for the split
>> streams. Stream splitting is also not available in PyFlink.>
>> > > Am I missing anything? Has anyone implemented this before in PyFlink,
>> or know of a way to make it happen?>
>> > > The use case here is that I'm using a CDC Debezium connector to
>> populate kafka topics from a multi-tenant database, and I'm trying to use
>> PyFlink to split the records into a different topic for each tenant.>
>> > > >
>> > > Thanks>
>> >
>> >
>>
>>
>>

Reply via email to