wydhcws commented on a change in pull request #15742: URL: https://github.com/apache/flink/pull/15742#discussion_r620818295
########## File path: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala ########## @@ -0,0 +1,67 @@ +package org.apache.flink.cep.scala + +import java.lang.reflect.Field + +import org.apache.flink.cep +import org.apache.flink.cep.pattern.Pattern +import org.apache.flink.cep.pattern.conditions.SimpleCondition +import org.apache.flink.streaming.api.datastream.DataStreamSource +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.junit.Assert.assertEquals +import org.junit.Test + +class CEPScalaApiPatternStreamTest { + /** + * These tests simply check that use the Scala API to update the TimeCharacteristic of the PatternStream . + */ + + @Test + def updateCepTimeCharacteristicByScalaApi(): Unit = { + + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + val input: DataStreamSource[Event] = env.fromElements(Event(1, "barfoo", 1.0), Event(8, "end", 1.0)) + val pattern: Pattern[Event, Event] = Pattern.begin("start").where(new SimpleCondition[Event]() { + override def filter(value: Event): Boolean = value.name == "start" + }) + + val jestream: cep.PatternStream[Event] = org.apache.flink.cep.CEP.pattern(input, pattern) + + //get org.apache.flink.cep.scala.PatternStream + val sePstream = new PatternStream[Event](jestream) + + //get TimeBehaviour + val time1: AnyRef = getTimeBehaviourFromScalaPatternStream(sePstream) + + assertEquals(time1.toString, "EventTime") + + //change TimeCharacteristic use scala api + val sPstream: PatternStream[Event] = sePstream.inProcessingTime() + + //get TimeBehaviour + val time2: AnyRef = getTimeBehaviourFromScalaPatternStream(sPstream) + + assertEquals(time2.toString, "ProcessingTime") + + + } + + def getTimeBehaviourFromScalaPatternStream(seStream: org.apache.flink.cep.scala.PatternStream[Event]) = { + val field: Field = seStream.getClass.getDeclaredField("jPatternStream") + field.setAccessible(true) + val JPattern: AnyRef = field.get(seStream) + val stream: cep.PatternStream[Event] = JPattern.asInstanceOf[cep.PatternStream[Event]] + getTimeBehaviourFromJavaPatternStream(stream) + } + + def getTimeBehaviourFromJavaPatternStream(jeStream: org.apache.flink.cep.PatternStream[Event])={ + val builder: Field = jeStream.getClass.getDeclaredField("builder") + builder.setAccessible(true) + val o: AnyRef = builder.get(jeStream) + val timeBehaviour: Field = o.getClass.getDeclaredField("timeBehaviour") + timeBehaviour.setAccessible(true) Review comment: sir,thank you for your suggestion. Actually, I also think this is not good, but I am a little confused. How to prove that a private variable is a change without reflection. Should I write a case to prove that the matching result changes after switching the timeCharacteristic of the PatternStream instead of proving that the private timeBehaviour changes ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org