[ https://issues.apache.org/jira/browse/KAFKA-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710216#comment-14710216 ]
Ewen Cheslack-Postava commented on KAFKA-2367: ---------------------------------------------- I posted an initial patch. It covers roughly the functionality of Avro's APIs. The actual data API code (not tests) clocks in at < 600 lines, substantially smaller than the code from Avro (which didn't even have GenericRecord support included...) Overall the code (including tests) shrunk by 1200 lines and now has more functionality. The API broader than Avro in some ways (more primitive types to cover types available in other serialization formats, supports typed keys in maps, etc), in other ways it's omitted some things currently (enums, unions). Some notes and thoughts: * I have not defined any logical types yet, but they can be implemented for any type and identified by the name + version fields. I think most of the effort here is in the specification. The implementation is just providing the schema definitions, and occasionally including conversion utilities for built-in java types that correspond to the type and are likely to be used commonly. * The version field is a bit weird - it is a byte[]. This lets you, e.g., use a network byte order integer directly if you want to. I considered just requiring a string instead. The overhead of requiring encoding to a string seems pretty minimal for versioning schemes I can think of (incrementing integers, hashes), but the byte[] seemed like the right way to handle it. One thing the documentation for version numbers doesn't currently define is how ordering works. I'm not sure versions are going to be very useful without this (besides uniquely identifying the schema, it would be nice to know whether a schema is newer or older than another one), but I'm not sure everyone will have a versioning scheme that allows for comparing versions without contacting an external service. * I provided support for byte[] and ByteBuffer. byte[] is really annoying since equals/hashCode don't work for it; the situation is made worse because it can be nested, arbitrarily deeply, in lists and maps. I haven't tried to address the equals/hashCode problems in those cases. I think its better to recommend using ByteBuffer when you need to be able to test for equality/compute hashcodes. However, this also needs to be respected by deserializers/converters. * Made schemas explicit in the Copycat APIs. Previously schemas were inferred for primitive types. I think making this explicit might be a bit more effort for connector developers, but it's ultimately the right thing to do. There are too many weird edge cases you can get into without explicitly specifying the schema, especially with list/map types (if it's empty, what are the key/value schemas?) and optional values (if you encounter a null, what's the schema?). * Caching of conversion of schemas. In the JSON implementation we're including, we're probably being pretty wasteful right now since every record has to translate both the schema and data to JSON. We should definitely be doing some caching here. I think an LRU using an IdentityHashMap should be fine. However, this does assume that connectors are good about reusing schemas (defining them up front, or if they are dynamic they should have their own cache of schemas and be able to detect when they can be reused). * Many serialization formats don't support unions natively. They require a separate struct that contains optional fields for all the types (and might provide syntactic sugar, but it has no impact on the actual encoding). When you make nullability part of every type, I think the need for unions pretty much disappears -- creating the appropriate struct is trivial. * I didn't include an IndexedRecord-like interface. I did allow setting a field by passing in the Field object instead of the field name, which avoids having to do a hash lookup of the fieldname, which has basically the same result as setting via the index. I didn't want to expose something like put(int fieldNumber, Object value) because using an array internally in Struct is really an implementation detail to avoid object allocations. * The fluent API for SchemaBuilder is different from how avro works. It prefers a single class for building schemas, doesn't directly handle nesting (you can just run another schemabuilder inline in the calls to the parent). This actually is a substantial difference -- in Copycat the expectation is that most schemas are created dynamically (most connectors do *not* have fixed schemas to work with), whereas Avro's seems more targeted towards more static use cases (not surprisingly). Avro's more complex implementation can catch certain types of errors at compile time (there are lots of different *Builder classes with various abilities/restrictions), but for Copycat this doesn't help much and just makes the code more verbose. See, e.g., https://github.com/confluentinc/copycat/blob/master/avro/src/main/java/io/confluent/copycat/avro/AvroData.java where a lot of the code paths are basically the same and the code could be a lot more concise except when I tried to do so, I ended up unable to get the different types from Avro's SchemaBuilder classes to work out properly. Since a lot of schema building in copycat is performed incrementally and can't *always* be chained nicely without a lot of code duplication, it's nice to have just a single type and gives you more flexibility in how you structure your code. The cost is that there are some runtime exceptions you might need to be careful to catch. * There is only a single API for Struct and struct building, i.e. Structs are mutable and there is no separate StructBuilder. This keeps the API simpler, more concise, and there are clear points in the Copycat runtime where we can call validate() on the data if we want to. This also means that Structs are "fluent", i.e. return this consistently for chaining. This may seem an odd choice given I did create separate Schema and SchemaBuilder classes, but I think that makes sense since you want schemas to be immutable once you've created them (which we don't quite enforce fully due to returning a mutable list of fields, but it's pretty close). * The combination of optional/nullable and default is a bit tricky and I think the semantics vary across the different serializers. What do you do when you have an optional field and a default? If you use null (or not serialized) to indicate the value hasn't been set, then you can't *explicitly* set the field to null. If you do something more complicated, you have to track more state/create more objects when setting field values (something like Option<Object> rather than just Object). The result of this is that if you actually want to be able to encode nulls, you cannot provide a non-null default value. In other words, the way to think of the default value is as a *getter* default value, not a *putter* default value: you only use the default value if a get() call would return null. I think I've gotten this right, but it's something to watch out for when reviewing. > Add Copycat runtime data API > ---------------------------- > > Key: KAFKA-2367 > URL: https://issues.apache.org/jira/browse/KAFKA-2367 > Project: Kafka > Issue Type: Sub-task > Components: copycat > Reporter: Ewen Cheslack-Postava > Assignee: Ewen Cheslack-Postava > Fix For: 0.8.3 > > > Design the API used for runtime data in Copycat. This API is used to > construct schemas and records that Copycat processes. This needs to be a > fairly general data model (think Avro, JSON, Protobufs, Thrift) in order to > support complex, varied data types that may be input from/output to many data > systems. > This should issue should also address the serialization interfaces used > within Copycat, which translate the runtime data into serialized byte[] form. > It is important that these be considered together because the data format can > be used in multiple ways (records, partition IDs, partition offsets), so it > and the corresponding serializers must be sufficient for all these use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)