Hi,
the reason the new schema feels a bit weird is that it implements a new
paradigm in a FlinkKafkaProducer that still follows a somewhat older
paradigm. In the old paradigm, partitioning and topic where configured
on the sink, which made it fixed for all produced records. The new
schema allows setting the partition and topic dynamically, which, as I
said, doesn't play well with the old way.
As Chesnay said, you can either pass the topic to the schema, to make it
fixed for all records or encode it in the data and then forward it to
the record.
Best,
Aljoscha
On 23.01.20 11:35, Chesnay Schepler wrote:
That's a fair question; the interface is indeed weird in this regard and
does have some issues.
From what I can tell you have 2 options:
a) have the user pass the topic to the serialization schema constructor,
which in practice would be identical to the topic they pass to the
producer.
b) Additionally implement KafkaContextAware, and have the schema
determine the topic _somehow_.
Both options are quite "eh" from a usability perspective;
a) requires the user to pass the same information around to multiple
places,
b) is inefficient since the topic has to be determined twice per record
(once when KCA#getTargetTopic is called, once again when serialize is
called), and can maybe(?) result in subtle issues if the 2 calls
determine different topics.
The Table API version of the sink handles this better since it's
serialization schema only returns a byte array, and not a producer record.
You could also use one of the deprecated constructors that accept a
"SerializationSchema" or "KeyedSerializationSchema" which handle this
case better.
I've CC'd Aljoscha who was involved in the introduction of current
iteration of the schema.
On 23/01/2020 03:13, Jason Kania wrote:
Thanks for responding.
I am aware where the topic is used. What I do not see is how to set
the topic within the class that implements the
KafkaSerializationSchema.serialize( T classObject, Long timestamp )
method.
The method must create and return a value of type
ProducerRecord<byte[], byte[]>, but all the constructors for
ProducerRecord expect "String topic" as the first argument. This will
not be passed to the method so the question is where the
implementation of the class is supposed to get the topic?
On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães
<speeddra...@gmail.com> wrote:
Hi Jason,
The topic is used in *FlinkKafkaConsumer*, following the
*KafkaDeserializationSchema* and then *Properties*.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer,
kafkaProperties)
...
class MessageDeserializer extends
KafkaDeserializationSchema[GenericRecord] {
On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <jason.ka...@ymail.com
<mailto:jason.ka...@ymail.com>> wrote:
Hello,
I was looking for documentation in 1.9.1 on how to create
implementations of the KafkaSerializationSchema and
KafkaDeserializationSchema interfaces. I have created
implementations in the past for the SerializationSchema and
DeserializationSchema interface. Unfortunately, I can find no
examples and the code contains no documentation for this purpose
but some information appears missing.
Can someone please answer the following:
1) When creating a ProducerRecord with the
KafkaSerializationSchema.serialize() method, how is the topic
String supposed to be obtained by the implementing class? All of
the constructors require that the topic be specified, but the
topic is not passed in. Is there another interface that should be
implemented to get the topic or get a callback? Or is expected
that the topic has to be fixed in the interface's implementation
class? Some of the constructors also ask for a partition. Again,
where is this information expected to come from?
2) The interfaces specify that ConsumerRecord<byte[], byte[]> is
received and ProducerRecord<byte[], byte[]> is to be generated.
What are the 2 byte arrays referencing in the type definitions?
Thanks,
Jason