I’m working on a Kafka Connect connector that reads a MySQL binlog to provide near real-time change data capture, and I also plan connectors for other DBMSes. The problem is that I’m not able to map all of the MySQL data types — or even all of the standard JDBC types — to Kafka Connect Schemas without resorting to complex Schemas that radically increase the footprint of messages.
Specifically, I’d like my connectors to be able to use the following “logical” types: - Bits: A set of bits of arbitrary length, corresponding to java.util.BitSet. See [1] for code.. - IsoTime: An ISO8601 time that includes the time zone and corresponding to Java 8’s java.time.OffsetTime that represents a time with the offset from UTC/Greenwich, and that has a well-defined ordering and thus is more suitable for persistent storage. See [2] for code.. - IsoTimestamp: An ISO8601 timestamp that includes the time zone and corresponding to Java 8’s java.time.OffsetDateTime that represents an instant with the offset from UTC/Greenwich, and that has a well-defined ordering and thus is more suitable for persistent storage. See [3] for code. These are very similar to the 4 built-in logical types (Decimal, Date, Time, and Timestamp). These logical types are much akin to aliases for a primitive type (typically BYTES), and their use within a Schema includes semantics that would not be there by just using the corresponding primitive. Unfortunately, Kafka Connect is not currently able to support custom logical types. Sure, you can create them, since the JsonConverter (nor any of the other Converters) will know how to serialize or deserialize them. One option is for Kafka Connect to add these, but this is sort of a never-ending battle. And, since Kafka is not yet on Java 8, supporting OffsetTime and OffsetDateTime would be problematic. Perhaps a better option is to support custom logical types, where each logical type must be based upon a single primitive type and must define a class that knows how to serialize and deserialize the logical type from the primitive type. The Converters, once modified, could look for the referenced class and use its serdes logic as needed. A couple of points: 1) Any source connector that is producing a record with these logical types would obviously have to have the logical type’s class available on the classpath. That doesn’t seem a difficult requirement to satisfy. 2) Any consumer or source connector that is consuming records with these values needs to be able to work with the logical type’s class to be able to work with it. This doesn’t seem too horrible, especially if the logical type class(es) are nicely separated into separate JARs. However, if the consumer doesn’t have the logical type class, then its local Converter would just deserialize to the corresponding primitive value (e.g., byte[], int, long, float, String, etc.) — is this sufficient if the consumer or source connector is simply passing the value along? 3) There are a couple of ways the logical type’s Schema object could reference its class. The 4 built-ins use the convention that the name corresponds to the name of the class, though I suspect this is largely just a technique to guarantees a unique name. However, at this time there is no interface or base class for logical types, so something would have to be changed to allow for easy invocation of the serdes methods. An alternative might be to add to “Schema” an optional “serdes” field that references the name of the class that implements a serdes interface; this is probably cleaner, though it does increase the verbosity of the Schema object. Thoughts? Randall Hauch [1] https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/Bits.java [2] https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTime.java [3] https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTimestamp.java