Please refer to this version: =============== import java.util.Date
import org.apache.flink.api.common.io.FilePathFilter import org.apache.flink.core.fs.Path import org.slf4j.LoggerFactory object SdcFilePathFilter { private val TIME_FORMAT = new java.text.SimpleDateFormat("yyyyMMdd hhmm") private val LOG = LoggerFactory.getLogger(classOf[SdcFilePathFilter]) } class SdcFilePathFilter(lookBackPeriod: Long) extends FilePathFilter { override def filterPath(filePath: Path): Boolean = { filePath == null || filePath.getName.startsWith(".") || filePath.getName.startsWith("_") || filePath.getName.contains(FilePathFilter.HADOOP_COPYING) || !(filePath.getName.endsWith(".tar.gz") || filePath.getName.matches("""^\d{8}$""") || (filePath.getName.matches("""^\d{4}$""") && { try { SdcFilePathFilter.TIME_FORMAT.parse(s"${filePath.getParent.getName} ${filePath.getName}").getTime < new Date().getTime - lookBackPeriod } catch { case e: Throwable => SdcFilePathFilter.LOG.warn("Unknown exception happens while checking folder eligibility: {}", e.getStackTrace) true } })) } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/