I’m working on a Kafka Connect connector that reads a MySQL binlog to provide 
near real-time change data capture, and I also plan connectors for other 
DBMSes. The problem is that I’m not able to map all of the MySQL data types — 
or even all of the standard JDBC types — to Kafka Connect Schemas without 
resorting to complex Schemas that radically increase the footprint of messages.

Specifically, I’d like my connectors to be able to use the following “logical” 
types:

- Bits: A set of bits of arbitrary length, corresponding to java.util.BitSet. 
See [1] for code..
- IsoTime: An ISO8601 time that includes the time zone and corresponding to 
Java 8’s java.time.OffsetTime that represents a time with the offset from 
UTC/Greenwich, and that has a well-defined ordering and thus is more suitable 
for persistent storage. See [2] for code..
- IsoTimestamp: An ISO8601 timestamp that includes the time zone and 
corresponding to Java 8’s java.time.OffsetDateTime that represents an instant 
with the offset from UTC/Greenwich, and that has a well-defined ordering and 
thus is more suitable for persistent storage. See [3] for code.

These are very similar to the 4 built-in logical types (Decimal, Date, Time, 
and Timestamp). These logical types are much akin to aliases for a primitive 
type (typically BYTES), and their use within a Schema includes semantics that 
would not be there by just using the corresponding primitive.

Unfortunately, Kafka Connect is not currently able to support custom logical 
types. Sure, you can create them, since the JsonConverter (nor any of the other 
Converters) will know how to serialize or deserialize them.

One option is for Kafka Connect to add these, but this is sort of a 
never-ending battle. And, since Kafka is not yet on Java 8, supporting 
OffsetTime and OffsetDateTime would be problematic.

Perhaps a better option is to support custom logical types, where each logical 
type must be based upon a single primitive type and must define a class that 
knows how to serialize and deserialize the logical type from the primitive 
type. The Converters, once modified, could look for the referenced class and 
use its serdes logic as needed.

A couple of points:

1) Any source connector that is producing a record with these logical types 
would obviously have to have the logical type’s class available on the 
classpath. That doesn’t seem a difficult requirement to satisfy.

2) Any consumer or source connector that is consuming records with these values 
needs to be able to work with the logical type’s class to be able to work with 
it. This doesn’t seem too horrible, especially if the logical type class(es) 
are nicely separated into separate JARs. However, if the consumer doesn’t have 
the logical type class, then its local Converter would just deserialize to the 
corresponding primitive value (e.g., byte[], int, long, float, String, etc.) — 
is this sufficient if the consumer or source connector is simply passing the 
value along?

3) There are a couple of ways the logical type’s Schema object could reference 
its class. The 4 built-ins use the convention that the name corresponds to the 
name of the class, though I suspect this is largely just a technique to 
guarantees a unique name. However, at this time there is no interface or base 
class for logical types, so something would have to be changed to allow for 
easy invocation of the serdes methods. An alternative might be to add to 
“Schema” an optional “serdes” field that references the name of the class that 
implements a serdes interface; this is probably cleaner, though it does 
increase the verbosity of the Schema object.


Thoughts?

Randall Hauch

[1] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/Bits.java
[2] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTime.java
[3] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTimestamp.java
 


Reply via email to