Hi,

the reason the new schema feels a bit weird is that it implements a new paradigm in a FlinkKafkaProducer that still follows a somewhat older paradigm. In the old paradigm, partitioning and topic where configured on the sink, which made it fixed for all produced records. The new schema allows setting the partition and topic dynamically, which, as I said, doesn't play well with the old way.

As Chesnay said, you can either pass the topic to the schema, to make it fixed for all records or encode it in the data and then forward it to the record.

Best,
Aljoscha

On 23.01.20 11:35, Chesnay Schepler wrote:
That's a fair question; the interface is indeed weird in this regard and does have some issues.

 From what I can tell you have 2 options:
a) have the user pass the topic to the serialization schema constructor, which in practice would be identical to the topic they pass to the producer. b) Additionally implement KafkaContextAware, and have the schema determine the topic _somehow_.

Both options are quite "eh" from a usability perspective;
a) requires the user to pass the same information around to multiple places, b) is inefficient since the topic has to be determined twice per record (once when KCA#getTargetTopic is called, once again when serialize is called), and can maybe(?) result in subtle issues if the 2 calls determine different topics.

The Table API version of the sink handles this better since it's serialization schema only returns a byte array, and not a producer record.

You could also use one of  the deprecated constructors that accept a "SerializationSchema" or "KeyedSerializationSchema" which handle this case better.

I've CC'd Aljoscha who was involved in the introduction of current iteration of the schema.

On 23/01/2020 03:13, Jason Kania wrote:
Thanks for responding.

I am aware where the topic is used. What I do not see is how to set the topic within the class that implements the KafkaSerializationSchema.serialize(  T classObject, Long timestamp ) method.

The method must create and return a value of type ProducerRecord<byte[], byte[]>, but all the constructors for ProducerRecord expect "String topic" as the first argument. This will not be passed to the method so the question is where the implementation of the class is supposed to get the topic?

On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães <speeddra...@gmail.com> wrote:


Hi Jason,

The topic is used in *FlinkKafkaConsumer*, following the *KafkaDeserializationSchema* and then *Properties*.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties)
...
class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord] {



On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <jason.ka...@ymail.com <mailto:jason.ka...@ymail.com>> wrote:

    Hello,

    I was looking for documentation in 1.9.1 on how to create
    implementations of the KafkaSerializationSchema and
    KafkaDeserializationSchema interfaces. I have created
    implementations in the past for the SerializationSchema and
    DeserializationSchema interface. Unfortunately, I can find no
    examples and the code contains no documentation for this purpose
    but some information appears missing.

    Can someone please answer the following:

    1) When creating a ProducerRecord with the
    KafkaSerializationSchema.serialize() method, how is the topic
    String supposed to be obtained by the implementing class? All of
    the constructors require that the topic be specified, but the
    topic is not passed in. Is there another interface that should be
    implemented to get the topic or get a callback? Or is expected
    that the topic has to be fixed in the interface's implementation
    class? Some of the constructors also ask for a partition. Again,
    where is this information expected to come from?

    2) The interfaces specify that ConsumerRecord<byte[], byte[]> is
    received and ProducerRecord<byte[], byte[]> is to be generated.
    What are the 2 byte arrays referencing in the type definitions?

    Thanks,

    Jason



Reply via email to