xuyangzhong commented on code in PR #23752: URL: https://github.com/apache/flink/pull/23752#discussion_r1445732928
########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala: ########## @@ -116,13 +123,27 @@ class StreamPhysicalJoin( } override def translateToExecNode(): ExecNode[_] = { + val stateTtlFromHint = new util.HashMap[JInt, JLong] + getHints + .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName)) + .forEach { + hint => + hint.kvOptions.forEach( + (input, ttl) => + stateTtlFromHint + .put( + if (input == FlinkHints.LEFT_INPUT) 0 else 1, Review Comment: Actually, the value of the original state ttl hint written by user is also a standardized format (duration), otherwise an exception will be thrown directly. It seems that there is no need to uniformly convert it to milliseconds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org