Hi Dawid I am a little bit worried by the code because of the ByteBuffer and the endianness? Do I really need to worry about them and determine them too?
Or was it just convenient to use ByteBuffer and the endianness here? Regards Hans public Event deserialize(byte[] message) throws IOException { ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN); int address = buffer.getInt(0); int typeOrdinal = buffer.getInt(4); return new Event(EventType.values()[typeOrdinal], address); } Op vr 4 feb. 2022 om 14:42 schreef Dawid Wysakowicz <dwysakow...@apache.org >: > Hi, > > You can use DeserializationSchema with KafkaSource as well. You can pass > it to the KafkaSource.builder(): > > KafkaSource.<...>builder() > > .setDeserializer(...) > > You can also take a look at the StateMachineExample[1], where KafkaSource > is used. > > BTW, have you tried looking at Table API? It would abstract quite a few > things for you, e.g. translation of what I presume is a CSV format[2] in > your case. > > Best, > > Dawid > > [1] > https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L104 > > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/ > On 03/02/2022 16:56, HG wrote: > > Hello > > Most examples available still use the FlinkKafkaConsumer unfortunately. > I need to consume events from Kafka. > The format is Long,Timestamp,String,String. > > Do I need to create a custom deserializer? > > What also confuses me is > > KafkaSource*<String>* source = KafkaSource.... > > How does it relate to the deserializer? > Is there a kind of <Row> type or is <String> fine even if the message is a > composite of Long,String...? > > Regards Hans > >