[ 
https://issues.apache.org/jira/browse/TIKA-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18062296#comment-18062296
 ] 

ASF GitHub Bot commented on TIKA-4606:
--------------------------------------

Copilot commented on code in PR #2655:
URL: https://github.com/apache/tika/pull/2655#discussion_r2875202876


##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java:
##########
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.ignite;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
+import org.apache.tika.SaveFetcherReply;
+import org.apache.tika.SaveFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.ExternalTestBase;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Testcontainers
+@Slf4j
+@Tag("E2ETest")
+@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Windows classpath length 
limit (CreateProcess error=206) exceeded by exec:exec with full Tika classpath")
+class IgniteConfigStoreTest {
+    
+    private static final int MAX_STARTUP_TIMEOUT = 
ExternalTestBase.MAX_STARTUP_TIMEOUT;
+    private static final File TEST_FOLDER = ExternalTestBase.TEST_FOLDER;
+    private static final boolean USE_LOCAL_SERVER = 
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "true"));
+    private static final int GRPC_PORT = 
Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052"));
+    
+    private static DockerComposeContainer<?> igniteComposeContainer;
+    private static Process localGrpcProcess;
+    
+    @BeforeAll
+    static void setupIgnite() throws Exception {
+        if (USE_LOCAL_SERVER) {
+            try {
+                killProcessOnPort(GRPC_PORT);
+                killProcessOnPort(3344);
+                killProcessOnPort(10800);
+            } catch (Exception e) {
+                log.debug("No orphaned processes to clean up");
+            }
+        }
+        
+        if (!hasExtractedFiles(TEST_FOLDER)) {
+            if (Boolean.parseBoolean(System.getProperty("tika.e2e.useGovdocs", 
"false"))) {
+                
ExternalTestBase.downloadAndUnzipGovdocs1(ExternalTestBase.GOV_DOCS_FROM_IDX, 
ExternalTestBase.GOV_DOCS_TO_IDX);
+            } else {
+                ExternalTestBase.copyTestFixtures();
+            }
+        }
+        
+        if (USE_LOCAL_SERVER) {
+            startLocalGrpcServer();
+        } else {
+            startDockerGrpcServer();
+        }
+    }
+    
+    /** Returns true only if the folder contains at least one non-zip 
extracted file. */
+    private static boolean hasExtractedFiles(File folder) {
+        if (!folder.exists()) {
+            return false;
+        }
+        File[] files = folder.listFiles(f -> f.isFile() && 
!f.getName().endsWith(".zip"));
+        return files != null && files.length > 0;
+    }
+
+    private static void startLocalGrpcServer() throws Exception {
+        log.info("Starting local tika-grpc server using Maven");
+        
+        Path currentDir = Path.of("").toAbsolutePath();
+        Path tikaRootDir = currentDir;
+        
+        while (tikaRootDir != null && 
+               !(Files.exists(tikaRootDir.resolve("tika-grpc")) && 
+                 Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) {
+            tikaRootDir = tikaRootDir.getParent();
+        }
+        
+        if (tikaRootDir == null) {
+            throw new IllegalStateException("Cannot find tika root directory. 
" +
+                "Current dir: " + currentDir + ". " +
+                "Please run from within the tika project.");
+        }
+        
+        Path tikaGrpcDir = tikaRootDir.resolve("tika-grpc");
+        if (!Files.exists(tikaGrpcDir)) {
+            throw new IllegalStateException("Cannot find tika-grpc directory 
at: " + tikaGrpcDir);
+        }
+        
+        String configFileName = "tika-config-ignite-local.json";
+        Path configFile = Path.of("src/test/resources/" + 
configFileName).toAbsolutePath();
+        
+        if (!Files.exists(configFile)) {
+            throw new IllegalStateException("Config file not found: " + 
configFile);
+        }
+        
+        log.info("Tika root: {}", tikaRootDir);
+        log.info("Using tika-grpc from: {}", tikaGrpcDir);
+        log.info("Using config file: {}", configFile);
+        
+        // Use mvn exec:exec to run as external process (not exec:java which 
breaks ServiceLoader)
+        String javaHome = System.getProperty("java.home");
+        boolean isWindows = 
System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
+        String javaCmd = javaHome + (isWindows ? "\\bin\\java.exe" : 
"/bin/java");
+        String mvnCmd = tikaRootDir.resolve(isWindows ? "mvnw.cmd" : 
"mvnw").toString();
+        
+        ProcessBuilder pb = new ProcessBuilder(
+            mvnCmd,
+            "exec:exec",
+            "-Dexec.executable=" + javaCmd,
+            "-Dexec.args=" +
+                "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+                "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+                "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+                "--add-opens=java.base/java.io=ALL-UNNAMED " +
+                "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+                "--add-opens=java.base/java.math=ALL-UNNAMED " +
+                "--add-opens=java.base/java.util=ALL-UNNAMED " +
+                "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+                "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
" +
+                "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED 
" +
+                "--add-opens=java.base/java.time=ALL-UNNAMED " +
+                "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED " +
+                "--add-opens=java.base/jdk.internal.access=ALL-UNNAMED " +
+                "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+                
"--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED " +
+                
"--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED " +
+                "-Dio.netty.tryReflectionSetAccessible=true " +
+                "-Dignite.work.dir=\"" + 
tikaGrpcDir.resolve("target/ignite-work") + "\" " +
+                "-classpath %classpath " +
+                "org.apache.tika.pipes.grpc.TikaGrpcServer " +
+                "-c \"" + configFile + "\" " +
+                "-p " + GRPC_PORT
+        );
+        
+        pb.directory(tikaGrpcDir.toFile());
+        pb.redirectErrorStream(true);
+        pb.redirectOutput(ProcessBuilder.Redirect.PIPE);
+        
+        localGrpcProcess = pb.start();
+        
+        final boolean[] igniteStarted = {false};
+        
+        Thread logThread = new Thread(() -> {
+            try (java.io.BufferedReader reader = new java.io.BufferedReader(
+                    new 
java.io.InputStreamReader(localGrpcProcess.getInputStream(), 
java.nio.charset.StandardCharsets.UTF_8))) {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    log.info("tika-grpc: {}", line);
+                    
+                    if (line.contains("Ignite server started") ||
+                        line.contains("Table") && line.contains("created 
successfully") ||
+                        line.contains("Server started, listening on")) {
+                        synchronized (igniteStarted) {
+                            igniteStarted[0] = true;
+                            igniteStarted.notifyAll();
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                log.error("Error reading server output", e);
+            }
+        });
+        logThread.setDaemon(true);
+        logThread.start();
+        
+        try {
+            org.awaitility.Awaitility.await()
+                .atMost(java.time.Duration.ofSeconds(180))
+                .pollInterval(java.time.Duration.ofSeconds(2))
+                .until(() -> {
+                    boolean igniteReady;
+                    synchronized (igniteStarted) {
+                        igniteReady = igniteStarted[0];
+                    }
+                    
+                    if (!igniteReady) {
+                        log.debug("Waiting for Ignite to start...");
+                        return false;
+                    }
+                    
+                    try {
+                        ManagedChannel testChannel = ManagedChannelBuilder
+                            .forAddress("localhost", GRPC_PORT)
+                            .usePlaintext()
+                            .build();
+                        
+                        try {
+                            io.grpc.health.v1.HealthGrpc.HealthBlockingStub 
healthStub = 
+                                
io.grpc.health.v1.HealthGrpc.newBlockingStub(testChannel)
+                                    .withDeadlineAfter(2, TimeUnit.SECONDS);
+                            
+                            io.grpc.health.v1.HealthCheckResponse response = 
healthStub.check(
+                                
io.grpc.health.v1.HealthCheckRequest.getDefaultInstance());
+                            
+                            boolean serving = response.getStatus() == 
+                                
io.grpc.health.v1.HealthCheckResponse.ServingStatus.SERVING;
+                            
+                            if (serving) {
+                                log.info("gRPC server is healthy and 
serving!");
+                                return true;
+                            } else {
+                                log.debug("gRPC server responding but not 
serving yet: {}", response.getStatus());
+                                return false;
+                            }
+                        } finally {
+                            testChannel.shutdown();
+                            testChannel.awaitTermination(1, TimeUnit.SECONDS);
+                        }
+                    } catch (io.grpc.StatusRuntimeException e) {
+                        if (e.getStatus().getCode() == 
io.grpc.Status.Code.UNIMPLEMENTED) {
+                            // Health check not implemented, just verify 
channel works
+                            log.info("Health check not available, assuming 
server is ready");
+                            return true;
+                        }
+                        log.debug("gRPC server not ready yet: {}", 
e.getMessage());
+                        return false;
+                    } catch (Exception e) {
+                        log.debug("gRPC server not ready yet: {}", 
e.getMessage());
+                        return false;
+                    }
+                });
+            
+            log.info("Both gRPC server and Ignite are ready!");
+        } catch (org.awaitility.core.ConditionTimeoutException e) {
+            if (localGrpcProcess.isAlive()) {
+                localGrpcProcess.destroyForcibly();
+            }
+            throw new RuntimeException("Local gRPC server or Ignite failed to 
start within timeout", e);
+        }
+        
+        log.info("Local tika-grpc server started successfully on port {}", 
GRPC_PORT);
+    }
+    
+    
+    private static void startDockerGrpcServer() {
+        String composeFilePath = 
System.getProperty("tika.docker.compose.ignite.file");
+        if (composeFilePath == null || composeFilePath.isBlank()) {
+            throw new IllegalStateException(
+                    "Docker Compose mode requires system property 
'tika.docker.compose.ignite.file' " +
+                    "pointing to a valid docker-compose-ignite.yml file.");
+        }
+        File composeFile = new File(composeFilePath);
+        if (!composeFile.isFile()) {
+            throw new IllegalStateException("Docker Compose file not found: " 
+ composeFile.getAbsolutePath());
+        }
+        igniteComposeContainer = new DockerComposeContainer<>(composeFile)
+                .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath())
+                .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT, 
ChronoUnit.SECONDS))
+                .withExposedService("tika-grpc", 50052,
+                    Wait.forLogMessage(".*Server started.*\\n", 1))
+                .withLogConsumer("tika-grpc", new Slf4jLogConsumer(log));
+        
+        igniteComposeContainer.start();
+    }
+    
+    @AfterAll
+    static void teardownIgnite() {
+        if (USE_LOCAL_SERVER && localGrpcProcess != null) {
+            log.info("Stopping local gRPC server and all child processes");
+            
+            try {
+                long mvnPid = localGrpcProcess.pid();
+                log.info("Maven process PID: {}", mvnPid);
+                localGrpcProcess.destroy();
+                
+                if (!localGrpcProcess.waitFor(10, TimeUnit.SECONDS)) {
+                    log.warn("Process didn't stop gracefully, forcing 
shutdown");
+                    localGrpcProcess.destroyForcibly();
+                    localGrpcProcess.waitFor(5, TimeUnit.SECONDS);
+                }
+                
+                Thread.sleep(2000);
+                
+                try {
+                    killProcessOnPort(GRPC_PORT);
+                    killProcessOnPort(3344);
+                    killProcessOnPort(10800);
+                } catch (Exception e) {
+                    log.debug("Error killing processes on ports (may already 
be stopped): {}", e.getMessage());
+                }
+                
+                log.info("Local gRPC server stopped");
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                localGrpcProcess.destroyForcibly();
+            }
+        } else if (igniteComposeContainer != null) {
+            igniteComposeContainer.close();
+        }
+    }
+    
+    private static void killProcessOnPort(int port) throws IOException, 
InterruptedException {
+        ProcessBuilder findPb = new ProcessBuilder("lsof", "-ti", ":" + port);
+        findPb.redirectErrorStream(true);
+        Process findProcess = findPb.start();
+        
+        try (java.io.BufferedReader reader = new java.io.BufferedReader(
+                new java.io.InputStreamReader(findProcess.getInputStream(), 
java.nio.charset.StandardCharsets.UTF_8))) {
+            String pidStr = reader.readLine();
+            if (pidStr != null && !pidStr.trim().isEmpty()) {
+                long pid = Long.parseLong(pidStr.trim());
+                long myPid = ProcessHandle.current().pid();
+                
+                if (pid == myPid || isParentProcess(pid)) {
+                    log.debug("Skipping kill of PID {} on port {} (test 
process or parent)", pid, port);
+                    return;
+                }
+                
+                // Only kill processes we can identify as tika-grpc or Ignite 
instances to avoid
+                // accidentally killing unrelated processes that happen to be 
on the same port.
+                String cmdLine = ProcessHandle.of(pid)
+                        .flatMap(h -> h.info().commandLine())
+                        .orElse("");
+                if (!cmdLine.contains("tika") && !cmdLine.contains("TikaGrpc") 
&& !cmdLine.contains("ignite")) {
+                    log.debug("Skipping kill of PID {} on port {} — not a 
tika/ignite process: {}", pid, port, cmdLine);
+                    return;
+                }
+                
+                log.info("Found tika/ignite process {} on port {}, killing 
it", pid, port);
+                
+                ProcessBuilder killPb = new ProcessBuilder("kill", 
String.valueOf(pid));
+                Process killProcess = killPb.start();
+                killProcess.waitFor(2, TimeUnit.SECONDS);
+                
+                Thread.sleep(1000);
+                ProcessBuilder forceKillPb = new ProcessBuilder("kill", "-9", 
String.valueOf(pid));
+                Process forceKillProcess = forceKillPb.start();
+                forceKillProcess.waitFor(2, TimeUnit.SECONDS);
+            }
+        }
+        
+        findProcess.waitFor(2, TimeUnit.SECONDS);
+    }
+    
+    private static boolean isParentProcess(long pid) {
+        try {
+            ProcessHandle current = ProcessHandle.current();
+            while (current.parent().isPresent()) {
+                current = current.parent().get();
+                if (current.pid() == pid) {
+                    return true;
+                }
+            }
+        } catch (Exception e) {
+            log.debug("Error checking parent process", e);
+        }
+        return false;
+    }
+    
+    @Test
+    void testIgniteConfigStore() throws Exception {
+        String fetcherId = "dynamicIgniteFetcher";
+        ManagedChannel channel = getManagedChannelForIgnite();
+        
+        try {
+            TikaGrpc.TikaBlockingStub blockingStub = 
TikaGrpc.newBlockingStub(channel);
+            TikaGrpc.TikaStub tikaStub = TikaGrpc.newStub(channel);
+
+            FileSystemFetcherConfig config = new FileSystemFetcherConfig();
+            String basePath = USE_LOCAL_SERVER ? TEST_FOLDER.getAbsolutePath() 
: "/tika/govdocs1";
+            config.setBasePath(basePath);
+            
+            String configJson = 
ExternalTestBase.OBJECT_MAPPER.writeValueAsString(config);
+            log.info("Creating fetcher with Ignite ConfigStore (basePath={}): 
{}", basePath, configJson);
+            
+            SaveFetcherReply saveReply = 
blockingStub.saveFetcher(SaveFetcherRequest
+                    .newBuilder()
+                    .setFetcherId(fetcherId)
+                    
.setFetcherClass("org.apache.tika.pipes.fetcher.fs.FileSystemFetcher")
+                    .setFetcherConfigJson(configJson)
+                    .build());
+            
+            log.info("Fetcher saved to Ignite: {}", saveReply.getFetcherId());
+
+            List<FetchAndParseReply> successes = 
Collections.synchronizedList(new ArrayList<>());
+            List<FetchAndParseReply> errors = Collections.synchronizedList(new 
ArrayList<>());
+
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            StreamObserver<FetchAndParseRequest>
+                    requestStreamObserver = 
tikaStub.fetchAndParseBiDirectionalStreaming(new StreamObserver<>() {
+                @Override
+                public void onNext(FetchAndParseReply fetchAndParseReply) {
+                    log.debug("Reply from fetch-and-parse - key={}, 
status={}", 
+                        fetchAndParseReply.getFetchKey(), 
fetchAndParseReply.getStatus());
+                    if 
("FETCH_AND_PARSE_EXCEPTION".equals(fetchAndParseReply.getStatus())) {
+                        errors.add(fetchAndParseReply);
+                    } else {
+                        successes.add(fetchAndParseReply);
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    log.error("Received an error", throwable);
+                    Assertions.fail(throwable);
+                    countDownLatch.countDown();
+                }
+
+                @Override
+                public void onCompleted() {
+                    log.info("Finished streaming fetch and parse replies");
+                    countDownLatch.countDown();
+                }
+            });
+
+            int maxDocs = 
Integer.parseInt(System.getProperty("corpus.numDocs", "-1"));
+            log.info("Document limit: {}", maxDocs == -1 ? "unlimited" : 
maxDocs);
+            
+            try (Stream<Path> paths = Files.walk(TEST_FOLDER.toPath())) {
+                Stream<Path> fileStream = paths
+                        .filter(Files::isRegularFile)
+                        .filter(p -> !p.getFileName().toString()
+                                .toLowerCase(Locale.ROOT)
+                                .endsWith(".zip"));
+                
+                if (maxDocs > 0) {
+                    fileStream = fileStream.limit(maxDocs);
+                        String relPath = 
TEST_FOLDER.toPath().relativize(file).toString();
+                        requestStreamObserver.onNext(FetchAndParseRequest
+                                .newBuilder()
+                                .setFetcherId(fetcherId)
+                                .setFetchKey(relPath)
+                                .build());
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });

Review Comment:
   The file-walking logic is currently malformed and will not compile: inside 
the `if (maxDocs > 0)` block, the code references `file` without defining it 
and appears to have a truncated/misplaced `forEach`/`try` block and closing 
`});`. This should be structured as: optionally apply `limit(maxDocs)` and then 
iterate `fileStream.forEach(file -> { ... })` outside the `if` block.





> Upgrade Ignite config store to Ignite 3.x with Calcite SQL engine
> -----------------------------------------------------------------
>
>                 Key: TIKA-4606
>                 URL: https://issues.apache.org/jira/browse/TIKA-4606
>             Project: Tika
>          Issue Type: Improvement
>            Reporter: Nicholas DiPiazza
>            Assignee: Nicholas DiPiazza
>            Priority: Major
>
> h2. Overview
> Upgrade the tika-pipes-config-store-ignite module from Apache Ignite 2.17.0 
> (which uses H2 1.4.x) to Apache Ignite 3.x (which uses Apache Calcite SQL 
> engine).
> h2. Current State
> * Module: *tika-pipes-config-store-ignite*
> * Ignite Version: 2.17.0
> * SQL Engine: H2 1.4.197 (embedded)
> * Location: {{tika-pipes/tika-pipes-config-store-ignite/}}
> h2. Goals
> # Upgrade to Apache Ignite 3.x (latest stable release)
> # Replace H2 SQL engine with Calcite-based SQL engine
> # Maintain all existing functionality for config store
> # Update API calls to match Ignite 3.x breaking changes
> # Ensure backward compatibility for stored configurations (if possible)
> h2. Benefits
> * Modern SQL engine with Apache Calcite
> * Better performance and query optimization
> * Active maintenance and future support
> * Improved SQL feature set
> * No dependency on old H2 1.4.x (2018)
> h2. Breaking Changes to Address
> * Ignite 3.x has major API changes from 2.x
> * Configuration format changes
> * Cache API differences
> * SQL query API updates
> * Client connection changes
> h2. Implementation Steps
> # Research Ignite 3.x API changes and migration guide
> # Update Maven dependencies to Ignite 3.x
> # Refactor {{IgniteConfigStore}} to use new Ignite 3.x API
> # Update {{IgniteStoreServer}} for new connection model
> # Modify SQL queries if needed for Calcite compatibility
> # Update configuration handling
> # Update tests to work with Ignite 3.x
> # Test backward compatibility with existing configs
> # Update documentation
> h2. Acceptance Criteria
> * Ignite upgraded to version 3.x (latest stable)
> * Uses Calcite SQL engine instead of H2
> * All existing tests pass
> * Config store functionality preserved
> * No H2 dependencies remain
> * Documentation updated
> h2. References
> * Apache Ignite 3.x: https://ignite.apache.org/docs/3.0.0/
> * Ignite 3.x Migration Guide
> * Apache Calcite: https://calcite.apache.org/
> * Current module: {{tika-pipes/tika-pipes-config-store-ignite/}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to