Hi Oran,
as you've already suggested, you could just use a (flat)map function that
takes an ObjectNode and outputs a string.
In the mapper, you can do whatever you want in case of an invalid object:
logging about it, discarding it, writing an "error json string", writing to
a side output stream, .
In the documentation we have an example on how to implement deserialization
from bytes to Jackson ObjectNode objects - JSONKeyValueDeserializationSchema
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
However, there is no example on the other direction:
>
>> On 1 May 2018, at 16:30, Wouter Zorgdrager wrote:
>>
>> So, I'm still struggling with this issue. I dived a bit more into the
>> problem and I'm pretty sure that the problem is that I have to (implicitly)
>> pass the SchemaFor and RecordTo classes t
n 1 May 2018, at 16:30, Wouter Zorgdrager wrote:
>
> So, I'm still struggling with this issue. I dived a bit more into the
> problem and I'm pretty sure that the problem is that I have to (implicitly)
> pass the SchemaFor and RecordTo classes to my serialization schema
> (o
dived a bit more into the
>> problem and I'm pretty sure that the problem is that I have to (implicitly)
>> pass the SchemaFor and RecordTo classes to my serialization schema
>> (otherwise I can't make it generic). However those class aren't
>> serializable
pretty sure that the problem is that I have to (implicitly) pass the
> SchemaFor and RecordTo classes to my serialization schema (otherwise I can't
> make it generic). However those class aren't serializable, but of course I
> can't annotate them transient nor make it a la
So, I'm still struggling with this issue. I dived a bit more into the
problem and I'm pretty sure that the problem is that I have to (implicitly)
pass the SchemaFor and RecordTo classes to my serialization schema
(otherwise I can't make it generic). However those class aren't
Zorgdrager [mailto:zorgdrag...@gmail.com]
> *Sent:* Wednesday, April 25, 2018 7:17 AM
> *To:* user@flink.apache.org
> *Subject:* KafkaProducer with generic (Avro) serialization schema
>
>
>
> Dear reader,
>
>
>
> I'm currently working on writing a Kafk
) serialization schema
Dear reader,
I'm currently working on writing a KafkaProducer which is able to serialize a
generic type using avro4s.
However this serialization schema is not serializable itself. Here is my code
for this:
The serialization schema:
class AvroSerializationSche
Dear reader,
I'm currently working on writing a KafkaProducer which is able to serialize
a generic type using avro4s.
However this serialization schema is not serializable itself. Here is my
code for this:
The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromR
to write to kafka however I am
>>>>> getting this error. Not sure why as I've already implemented the
>>>>> interfaces.
>>>>>
>>>>> Caused by: java.io.NotSerializableException:
>>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>> at java.io.ObjectOutputStream.wri
>>>>> teObject0(ObjectOutputStream.java:1184)
>>>>> at java.io.ObjectOutputStream.def
>>>>> aultWriteFields(ObjectOutputStream.java:1548)
>>>>>
>>>>> And the class implements the following:
>>>>>
>>>>> *public* *class* *Tuple2Serializerr* *implements*
>>>>>
>>>>> DeserializationSchema>,
>>>>>
>>>>> SerializationSchema> {
>>>>>
>>>>> And called like this:
>>>>>
>>>>>
>>>>> FlinkKafkaProducer010> myProducer = *new*
>>>>> FlinkKafkaProducer010>(
>>>>>
>>>>> "10.22.4.15:9092", // broker list
>>>>>
>>>>> "my-topic", // target topic
>>>>>
>>>>> *new* Tuple2Serializerr()); // serialization schema
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
FlinkKafkaProducer010> myProducer = new
FlinkKafkaProducer010>(
"10.22.4.15:9092", // broker list
"my-topic", // target topic
new Tuple2Serializerr()); // serialization schema
Object0(ObjectOutputStream.j
>>>> ava:1184)
>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>> ream.java:1548)
>>>>
>>>> And the class implements the following:
>>>>
>>>> *public* *class* *Tuple2Serializerr* *implements*
>>>>
>>>> DeserializationSchema>,
>>>>
>>>> SerializationSchema> {
>>>>
>>>> And called like this:
>>>>
>>>>
>>>> FlinkKafkaProducer010> myProducer = *new*
>>>> FlinkKafkaProducer010>(
>>>>
>>>> "10.22.4.15:9092", // broker list
>>>>
>>>> "my-topic", // target topic
>>>>
>>>> *new* Tuple2Serializerr()); // serialization schema
>>>>
>>>>
>>>>
>>>>
>>>
>>
>
ducer010>(
"10.22.4.15:9092", // broker list
"my-topic", // target topic
new Tuple2Serializerr()); // serialization schema
gt;>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>> ream.java:1548)
>>>
>>> And the class implements the following:
>>>
>>> *public* *class* *Tuple2Serializerr* *implements*
>>>
>>> DeserializationSchema>,
:9092", // broker list
"my-topic", // target topic
new Tuple2Serializerr()); // serialization schema
*Tuple2Serializerr* *implements*
>>
>> DeserializationSchema>,
>>
>> SerializationSchema> {
>>
>> And called like this:
>>
>>
>> FlinkKafkaProducer010> myProducer = *new*
>> FlinkKafkaProducer010>(
>>
>> "10.22.4.15:9092", // broker list
>>
>> "my-topic", // target topic
>>
>> *new* Tuple2Serializerr()); // serialization schema
>>
>>
>>
>>
>
t; myProducer = *new*
> FlinkKafkaProducer010>(
>
> "10.22.4.15:9092", // broker list
>
> "my-topic", // target topic
>
> *new* Tuple2Serializerr()); // serialization schema
>
>
>
>
gt; myProducer = *new*
FlinkKafkaProducer010>(
"10.22.4.15:9092", // broker list
"my-topic", // target topic
*new* Tuple2Serializerr()); // serialization schema
19 matches
Mail list logo