Thank's that works!
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13725.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.
in getProducedType(), replace the implementation with:
return new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
TypeExtractor.getForClass(CustomObject.class));
On 13.06.2017 17:18, AndreaKinn wrote:
Can I ask you to help me? I trying to implement a CustomDeserializer
My kafka messages are com
Can I ask you to help me? I trying to implement a CustomDeserializer
My kafka messages are composed by KeyedMessages where key and messages are
strings.
I created a new class named CustomObject to manage the message string
because it's more complex then a simple string.
public class CustomDeseria
You have to create your own implementation that deserializes the byte
arrays into whatever type you want to use.
On 13.06.2017 13:19, AndreaKinn wrote:
But KeyedDeserializationSchema has just 2 implementations:
TypeInformationKeyValueSerializationSchema
JSONKeyValueDeserializationSchema
The
But KeyedDeserializationSchema has just 2 implementations:
TypeInformationKeyValueSerializationSchema
JSONKeyValueDeserializationSchema
The first give me this error:
06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED
java.io.EOFException at
org.apache.flink.runtime.util.DataInput
Have you tried implementing a KeyedDeserializationSchema?
This receives both the message and key as byte arrays, which you could then
deserialize as strings and return them in a Tuple2.
On 13.06.2017 12:36, AndreaKinn wrote:
Hi,
I already spent two days trying to get simple messages from Kafka
Hi,
I already spent two days trying to get simple messages from Kafka without
success.
I have a Kafka producer written in javascript:
KeyedMessage = kafka.KeyedMessage;
keyed_message = new KeyedMessage(key, string_to_sent);
payload = [{topics: topic, messages: keyed_message }];
And I want to re