[ 
https://issues.apache.org/jira/browse/FLINK-11021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703290#comment-16703290
 ] 

Christina commented on FLINK-11021:
-----------------------------------

Looks like the root cause of this issue is that kryo added Java 8 time support 
in 4.0.0 (merged PR [https://github.com/EsotericSoftware/kryo/pull/395/files).] 
Flink 1.6.2 uses kryo version 2.24.0. 

Are there plans to update the kryo dependency in a future version of Flink? 
We'd be interested in having built in support for these standard lib Java 8 
classes. For now we have implemented our own kryo serializers for these classes 
as a workaround.

> ZoneOffset objects don't appear to be serialized correctly
> ----------------------------------------------------------
>
>                 Key: FLINK-11021
>                 URL: https://issues.apache.org/jira/browse/FLINK-11021
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.6.2
>         Environment: Scala 2.11.11, OpenJDK 1.8.0_192-b12
>            Reporter: Christina
>            Priority: Major
>         Attachments: TimezoneStreamProcessor.scala
>
>
> In Flink 1.6.2 ZoneOffset objects are not being serialized correctly: they 
> are turned into `null` when serialized in a Flink job. I've attached a basic 
> sample job that illustrates the problem along with a few sbt console commands 
> (below) that also exhibit the problem.
>  
> {code:java}
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import java.time._
> import java.time.temporal.ChronoUnit
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)
> val UTCZoneId = ZoneId.of("UTC")
> val UTCZoneOffset = ZoneOffset.UTC
> case class MyTime(
> timestamp: ZonedDateTime,
> zoneOffset: ZoneOffset
> ) extends Serializable
> val now = MyTime(ZonedDateTime.now(UTCZoneId).truncatedTo(ChronoUnit.MILLIS), 
> UTCZoneOffset)
> val mytimeTypeInfo = 
> org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[MyTime])
> val ser = mytimeTypeInfo.createSerializer(env.getConfig)
> val out = new org.apache.flink.core.memory.ByteArrayDataOutputView()
> val serialized = ser.serialize(now, out)
> val bytes = out.toByteArray
> val in = new org.apache.flink.core.memory.ByteArrayDataInputView(bytes)
> val deserialized = ser.deserialize(in)
> println(s"""
> before serializaton: $now
> after serialization: $deserialized
> """){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to