[ https://issues.apache.org/jira/browse/FLINK-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056044#comment-16056044 ]
Tzu-Li (Gordon) Tai commented on FLINK-6844: -------------------------------------------- [~shashank734] I've tested CEP + with Scala collections as the event type (which would then let the CEP operator use the {{TraversableSerializer}} internally), with the following code: {code} object FlinkCEPTest { def main(args: Array[String]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) val source: DataStream[scala.collection.immutable.List[java.lang.String]] = env.socketTextStream("localhost", 9999).map(x => List(x.split(",")(0))) val pattern = Pattern.begin("start").where(new SimpleCondition[List[String]] { override def filter(t: List[String]) = t.head.equals("a") }).times(4).allowCombinations().followedBy("end").where(new SimpleCondition[List[String]] { override def filter(t: List[String]) = t.head.equals("b") }) CEP.pattern(source, pattern).select(_.toString()).print() // execute program env.execute("Flink CEP test") } } {code} I can confirm that this works correctly without any errors in branch {{release-1.3}}. Checkpoint + restoring from savepoints works correctly. As I've mentioned, simply applying the commit for this JIRA onto {{release-1.3.0}} may not work, as the whole fix includes other commits as well. Please let me know if you think otherwise! > TraversableSerializer should implement compatibility methods > ------------------------------------------------------------ > > Key: FLINK-6844 > URL: https://issues.apache.org/jira/browse/FLINK-6844 > Project: Flink > Issue Type: Bug > Components: Type Serialization System > Affects Versions: 1.3.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Labels: flink-rel-1.3.1-blockers > Fix For: 1.3.1, 1.4.0 > > > The {{TraversableSerializer}} may be used as a serializer for managed state > and takes part in checkpointing, therefore should implement the compatibility > methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)