[ https://issues.apache.org/jira/browse/FLINK-11021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703290#comment-16703290 ]
Christina commented on FLINK-11021: ----------------------------------- Looks like the root cause of this issue is that kryo added Java 8 time support in 4.0.0 (merged PR [https://github.com/EsotericSoftware/kryo/pull/395/files).] Flink 1.6.2 uses kryo version 2.24.0. Are there plans to update the kryo dependency in a future version of Flink? We'd be interested in having built in support for these standard lib Java 8 classes. For now we have implemented our own kryo serializers for these classes as a workaround. > ZoneOffset objects don't appear to be serialized correctly > ---------------------------------------------------------- > > Key: FLINK-11021 > URL: https://issues.apache.org/jira/browse/FLINK-11021 > Project: Flink > Issue Type: Bug > Affects Versions: 1.6.2 > Environment: Scala 2.11.11, OpenJDK 1.8.0_192-b12 > Reporter: Christina > Priority: Major > Attachments: TimezoneStreamProcessor.scala > > > In Flink 1.6.2 ZoneOffset objects are not being serialized correctly: they > are turned into `null` when serialized in a Flink job. I've attached a basic > sample job that illustrates the problem along with a few sbt console commands > (below) that also exhibit the problem. > > {code:java} > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import java.time._ > import java.time.temporal.ChronoUnit > val env = StreamExecutionEnvironment.createLocalEnvironment(1) > val UTCZoneId = ZoneId.of("UTC") > val UTCZoneOffset = ZoneOffset.UTC > case class MyTime( > timestamp: ZonedDateTime, > zoneOffset: ZoneOffset > ) extends Serializable > val now = MyTime(ZonedDateTime.now(UTCZoneId).truncatedTo(ChronoUnit.MILLIS), > UTCZoneOffset) > val mytimeTypeInfo = > org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[MyTime]) > val ser = mytimeTypeInfo.createSerializer(env.getConfig) > val out = new org.apache.flink.core.memory.ByteArrayDataOutputView() > val serialized = ser.serialize(now, out) > val bytes = out.toByteArray > val in = new org.apache.flink.core.memory.ByteArrayDataInputView(bytes) > val deserialized = ser.deserialize(in) > println(s""" > before serializaton: $now > after serialization: $deserialized > """){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)