GanfengTan commented on code in PR #6132: URL: https://github.com/apache/inlong/pull/6132#discussion_r991791862
########## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java: ########## @@ -65,4 +81,56 @@ public static boolean isJSON(String json) { return isJson; } + /** + * Filter file by conditions + */ + public static Collection<File> filterFile(Collection<File> allFiles, JobProfile jobConf) { + // filter file by labels + Collection<File> files = null; + try { + files = filterByLabels(allFiles, jobConf); + } catch (IOException e) { + LOGGER.error("filter file error: ", e); + } + return files; + } + + /** + * Filter file by labels if standard log for k8s + */ + private static Collection<File> filterByLabels(Collection<File> allFiles, JobProfile jobConf) throws IOException { + Map<String, String> labelsMap = MetaDataUtils.getPodLabels(jobConf); + if (labelsMap.isEmpty()) { + return allFiles; + } + Collection<File> standardK8sLogFiles = new ArrayList<>(); + Iterator<File> iterator = allFiles.iterator(); + KubernetesClient client = PluginUtils.getKubernetesClient(); + while (iterator.hasNext()) { + File file = iterator.next(); + Map<String, String> logInfo = MetaDataUtils.getLogInfo(file.getName()); + if (logInfo.isEmpty()) { + continue; + } + PodResource podResource = client.pods().inNamespace(logInfo.get(NAMESPACE)) + .withName(logInfo.get(POD_NAME)); + if (Objects.isNull(podResource)) { + continue; + } + Pod pod = podResource.get(); + Map<String, String> podLabels = pod.getMetadata().getLabels(); + AtomicBoolean filterLabelStatus = new AtomicBoolean(true); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org