Hi Brian, Currently Java's JDBCIO does not do special cross-language things. A DATETIME field type appears because the Row contains an Instant object. And the sdk will always encode the Instant object using InstantCoder. This is done for both Java pipelines and cross-language pipelines. To use millis_instant in JDBCIO and to avoid break Instant used elsewhere (like watermarks) I will need to change the type of timestamp returned by JDBC read from Instant to millis_instant.
We could make the name of the logical type to be "beam:logical_type:millis_instant" which is backed by a big endian int 64. Best, Yi On Fri, Aug 12, 2022 at 5:28 PM Brian Hulette <bhule...@google.com> wrote: > Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't > simulate TimestampCoder with a logical type. > > I think the ideal end state would be to have a well-defined > beam:logical_type:millis_instant that we use for cross-language (when > appropriate), and never use DATETIME at cross-language boundaries. Would it > be possible to add millis_instant, and use that for JDBC read/write instead > of DATETIME? > > Separately we could consider how to resolve the conflicting definitions of > beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that > without breaking pipeline update. > > Brian > > > On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev <dev@beam.apache.org> wrote: > >> Hi Cham, >> >> Thanks for the comments. >> >> >>> >>>> >>>> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in >>>> implementation it will use BigEndianLongCoder to encode/decode the stream. >>>> >>>> >>> Is this to be compatible with the current Java implementation ? And we >>> have to update other SDKs to use big endian coders when encoding/decoding >>> the "beam:logical_type:instant:v1" logical type ? >>> >>> >> Yes, and the proposal is aimed to keep the Java SDK change minimal; we >> have to update other SDKs to make it work. Currently python and go sdk does >> not implement "beam:logical_type:datetime:v1" (will >> be "beam:logical_type:instant:v1") at all. >> >> >>> >>> >>>> For the second step ii, the problem is that there is a primitive type >>>> backed by a fixed length integer coder. Currently INT8, INT16, INT32, >>>> INT64... are all backed by VarInt (and there is ongoing work to use fixed >>>> size big endian to encode INT8, INT16 ( >>>> https://github.com/apache/beam/issues/19815)). Ideally I would think >>>> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) >>>> primitive type is backed by VarInt. But this may be a more substantial >>>> change for the current code base. >>>> >>> >>> I'm a bit confused by this. Did you mean that there's *no* primitive >>> type backed by a fixed length integer coder ? Also, by primitive, I'm >>> assuming you mean Beam Schema types here. >>> >>> >> Yes I mean Beam Schema types here. The proto for datetime(instant) >> logical type is constructed here: >> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202 >> It is represented by an INT64 atomic type. In cross-language case, >> another SDK receives proto and decodes the stream according to the proto. >> Currently I do not see an atomic type that will be decoded using a >> fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with >> VarInt. >> >> As a workaround in the PR (#22561), in python's RowCoder I explicitly set >> the coder for "beam:logical_type:datetime:v1" (will >> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a >> way to keep the logic contained in the logical type implementation, e.g. in >> to_language_type and to_representation_type method. To do this I will need >> an atomic type that is decoded using the BigEndianLong coder. >> Please point out if I was wrong. >> >> Best, >> Yi >> >