This is an automated email from the ASF dual-hosted git repository.
nddipiazza pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new e8c36c9e29 TIKA-4722: Add parse_context_json field to
FetchAndParseRequest for per-request ParseContext configuration (#2797)
e8c36c9e29 is described below
commit e8c36c9e29bd75642cdc37f5094474193256a095
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Fri May 1 14:22:18 2026 +0000
TIKA-4722: Add parse_context_json field to FetchAndParseRequest for
per-request ParseContext configuration (#2797)
- Replace handler_type field with generic parse_context_json string field
(field 5)
- parse_context_json accepts a JSON object mapping parse-context component
names to their configs
Example: {"basic-content-handler-factory": {"type": "HTML"}}
- TikaGrpcServerImpl iterates the JSON fields and calls
parseContext.setJsonConfig() for each
- Allows overriding any registered parse-context component per request, not
just content handler type
- e2e test uses parse_context_json to request HTML and TEXT output and
verifies the difference
Co-authored-by: Copilot <[email protected]>
---
.../tika/pipes/filesystem/HandlerTypeTest.java | 356 +++++++++++++++++++++
.../resources/tika-config-ignite-handlertype.json | 34 ++
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 6 +
tika-grpc/src/main/proto/tika.proto | 5 +
4 files changed, 401 insertions(+)
diff --git
a/tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/filesystem/HandlerTypeTest.java
b/tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/filesystem/HandlerTypeTest.java
new file mode 100644
index 0000000000..ea1f11f52b
--- /dev/null
+++
b/tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/filesystem/HandlerTypeTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.filesystem;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
+import org.apache.tika.SaveFetcherReply;
+import org.apache.tika.SaveFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.ExternalTestBase;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig;
+
+/**
+ * Tests per-request ParseContext configuration via
FetchAndParseRequest.parse_context_json.
+ *
+ * Uses the Ignite ConfigStore so that fetchers registered via saveFetcher are
visible
+ * to both the gRPC server JVM and the forked PipesServer JVM.
+ *
+ * Verifies that clients can override any parse context component on a
per-request basis
+ * by providing a JSON object with component names as keys.
+ * Example: {"basic-content-handler-factory": {"type": "HTML"}}
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Slf4j
+@Tag("E2ETest")
+@DisabledOnOs(value = OS.WINDOWS, disabledReason = "exec:exec classpath
exceeds Windows CreateProcess command-line length limit")
+class HandlerTypeTest {
+
+ private static final File TEST_FOLDER = ExternalTestBase.TEST_FOLDER;
+ private static final int GRPC_PORT =
Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052"));
+
+ private static Process localGrpcProcess;
+
+ @BeforeAll
+ void setup() throws Exception {
+ try {
+ killProcessOnPort(GRPC_PORT);
+ killProcessOnPort(3344);
+ killProcessOnPort(10800);
+ } catch (Exception e) {
+ log.debug("No orphaned processes to clean up: {}", e.getMessage());
+ }
+
+ ExternalTestBase.copyTestFixtures();
+ startLocalGrpcServer();
+ }
+
+ @AfterAll
+ void teardown() {
+ if (localGrpcProcess != null) {
+ log.info("Stopping local gRPC server and child processes");
+ localGrpcProcess.destroy();
+ try {
+ if (!localGrpcProcess.waitFor(10, TimeUnit.SECONDS)) {
+ localGrpcProcess.destroyForcibly();
+ localGrpcProcess.waitFor(5, TimeUnit.SECONDS);
+ }
+ Thread.sleep(2000);
+ killProcessOnPort(GRPC_PORT);
+ killProcessOnPort(3344);
+ killProcessOnPort(10800);
+ } catch (Exception e) {
+ log.debug("Error during teardown: {}", e.getMessage());
+ }
+ log.info("Local gRPC server stopped");
+ }
+ }
+
+ private static void startLocalGrpcServer() throws Exception {
+ log.info("Starting local tika-grpc server with Ignite config for
HandlerType test");
+
+ Path currentDir = Path.of("").toAbsolutePath();
+ Path tikaRootDir = currentDir;
+ while (tikaRootDir != null &&
+ !(Files.exists(tikaRootDir.resolve("tika-grpc")) &&
+ Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) {
+ tikaRootDir = tikaRootDir.getParent();
+ }
+ if (tikaRootDir == null) {
+ throw new IllegalStateException("Cannot find tika root directory.
Current dir: " + currentDir);
+ }
+
+ Path tikaGrpcDir = tikaRootDir.resolve("tika-grpc");
+ Path configFile =
Path.of("src/test/resources/tika-config-ignite-handlertype.json").toAbsolutePath();
+ if (!Files.exists(configFile)) {
+ throw new IllegalStateException("Config file not found: " +
configFile);
+ }
+
+ log.info("tika-grpc dir: {}", tikaGrpcDir);
+ log.info("Config file: {}", configFile);
+
+ String javaHome = System.getProperty("java.home");
+ boolean isWindows =
System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
+ String javaCmd = javaHome + (isWindows ? "\\bin\\java.exe" :
"/bin/java");
+ String mvnCmd = tikaRootDir.resolve(isWindows ? "mvnw.cmd" :
"mvnw").toString();
+
+ ProcessBuilder pb = new ProcessBuilder(
+ mvnCmd,
+ "exec:exec",
+ "-Dexec.executable=" + javaCmd,
+ "-Dexec.args=" +
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+ "--add-opens=java.base/java.io=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/java.math=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
" +
+ "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
" +
+ "--add-opens=java.base/java.time=ALL-UNNAMED " +
+ "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED " +
+ "--add-opens=java.base/jdk.internal.access=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+
"--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED " +
+
"--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED " +
+ "-Dio.netty.tryReflectionSetAccessible=true " +
+ "-Dignite.work.dir=\"" +
tikaGrpcDir.resolve("target/ignite-work-handlertype") + "\" " +
+ "-classpath %classpath " +
+ "org.apache.tika.pipes.grpc.TikaGrpcServer " +
+ "-c \"" + configFile + "\" " +
+ "-p " + GRPC_PORT
+ );
+
+ pb.directory(tikaGrpcDir.toFile());
+ pb.redirectErrorStream(true);
+ pb.redirectOutput(ProcessBuilder.Redirect.PIPE);
+
+ localGrpcProcess = pb.start();
+
+ final boolean[] igniteStarted = {false};
+ Thread logThread = new Thread(() -> {
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(localGrpcProcess.getInputStream(),
StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ log.info("tika-grpc: {}", line);
+ if (line.contains("Ignite server started") ||
+ (line.contains("Table") && line.contains("created
successfully")) ||
+ line.contains("Server started, listening on")) {
+ synchronized (igniteStarted) {
+ igniteStarted[0] = true;
+ igniteStarted.notifyAll();
+ }
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error reading server output", e);
+ }
+ });
+ logThread.setDaemon(true);
+ logThread.start();
+
+ try {
+ Awaitility.await()
+ .atMost(java.time.Duration.ofSeconds(180))
+ .pollInterval(java.time.Duration.ofSeconds(2))
+ .until(() -> {
+ synchronized (igniteStarted) {
+ if (!igniteStarted[0]) {
+ return false;
+ }
+ }
+ ManagedChannel testChannel = ManagedChannelBuilder
+ .forAddress("localhost", GRPC_PORT)
+ .usePlaintext()
+ .build();
+ try {
+ io.grpc.health.v1.HealthGrpc.HealthBlockingStub
healthStub =
+
io.grpc.health.v1.HealthGrpc.newBlockingStub(testChannel)
+ .withDeadlineAfter(2, TimeUnit.SECONDS);
+ io.grpc.health.v1.HealthCheckResponse response =
healthStub.check(
+
io.grpc.health.v1.HealthCheckRequest.getDefaultInstance());
+ return response.getStatus() ==
+
io.grpc.health.v1.HealthCheckResponse.ServingStatus.SERVING;
+ } catch (io.grpc.StatusRuntimeException e) {
+ if (e.getStatus().getCode() ==
io.grpc.Status.Code.UNIMPLEMENTED) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ } finally {
+ testChannel.shutdown();
+ testChannel.awaitTermination(1, TimeUnit.SECONDS);
+ }
+ });
+ } catch (org.awaitility.core.ConditionTimeoutException e) {
+ if (localGrpcProcess.isAlive()) {
+ localGrpcProcess.destroyForcibly();
+ }
+ throw new RuntimeException("tika-grpc server with Ignite failed to
start within timeout", e);
+ }
+
+ log.info("HandlerType test server ready on port {}", GRPC_PORT);
+ }
+
+ private ManagedChannel getManagedChannel() {
+ return ManagedChannelBuilder
+ .forAddress("localhost", GRPC_PORT)
+ .usePlaintext()
+ .maxInboundMessageSize(160 * 1024 * 1024)
+ .build();
+ }
+
+ private static void killProcessOnPort(int port) throws IOException,
InterruptedException {
+ ProcessBuilder findPb = new ProcessBuilder("lsof", "-ti", ":" + port);
+ findPb.redirectErrorStream(true);
+ Process findProcess = findPb.start();
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(findProcess.getInputStream(),
StandardCharsets.UTF_8))) {
+ String pidStr = reader.readLine();
+ if (pidStr != null && !pidStr.trim().isEmpty()) {
+ long pid = Long.parseLong(pidStr.trim());
+ long myPid = ProcessHandle.current().pid();
+ if (pid == myPid || isParentProcess(pid)) {
+ return;
+ }
+ String cmdLine = ProcessHandle.of(pid)
+ .flatMap(h -> h.info().commandLine())
+ .orElse("");
+ if (!cmdLine.contains("tika") && !cmdLine.contains("TikaGrpc")
&& !cmdLine.contains("ignite")) {
+ log.debug("Skipping kill of PID {} on port {} — not a
tika/ignite process", pid, port);
+ return;
+ }
+ log.info("Killing tika/ignite process {} on port {}", pid,
port);
+ new ProcessBuilder("kill",
String.valueOf(pid)).start().waitFor(2, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+ new ProcessBuilder("kill", "-9",
String.valueOf(pid)).start().waitFor(2, TimeUnit.SECONDS);
+ }
+ }
+ findProcess.waitFor(2, TimeUnit.SECONDS);
+ }
+
+ private static boolean isParentProcess(long pid) {
+ try {
+ ProcessHandle current = ProcessHandle.current();
+ while (current.parent().isPresent()) {
+ current = current.parent().get();
+ if (current.pid() == pid) {
+ return true;
+ }
+ }
+ } catch (Exception e) {
+ log.debug("Error checking parent process", e);
+ }
+ return false;
+ }
+
+ @Test
+ void testParseContextJson() throws Exception {
+ String fetcherId = "handlerTypeFetcher";
+ ManagedChannel channel = getManagedChannel();
+ try {
+ TikaGrpc.TikaBlockingStub blockingStub =
TikaGrpc.newBlockingStub(channel);
+
+ FileSystemFetcherConfig config = new FileSystemFetcherConfig();
+ config.setBasePath(TEST_FOLDER.getAbsolutePath());
+
+ SaveFetcherReply saveReply =
blockingStub.saveFetcher(SaveFetcherRequest.newBuilder()
+ .setFetcherId(fetcherId)
+
.setFetcherClass("org.apache.tika.pipes.fetcher.fs.FileSystemFetcher")
+
.setFetcherConfigJson(ExternalTestBase.OBJECT_MAPPER.writeValueAsString(config))
+ .build());
+ log.info("Fetcher created: {}", saveReply.getFetcherId());
+
+ // Parse sample.html requesting HTML output
+ FetchAndParseReply htmlReply =
blockingStub.fetchAndParse(FetchAndParseRequest.newBuilder()
+ .setFetcherId(fetcherId)
+ .setFetchKey("sample.html")
+ .setParseContextJson("{\"basic-content-handler-factory\":
{\"type\": \"HTML\"}}")
+ .build());
+
+ log.info("HTML parse status: {}", htmlReply.getStatus());
+ Assertions.assertEquals("PARSE_SUCCESS", htmlReply.getStatus(),
+ "Parse should succeed with HTML handler type");
+
+ String htmlContent =
htmlReply.getFieldsMap().get("X-TIKA:content");
+ Assertions.assertNotNull(htmlContent, "Content should be present
in HTML response");
+ log.info("HTML content (first 200 chars): {}",
htmlContent.substring(0, Math.min(200, htmlContent.length())));
+ Assertions.assertTrue(
+ htmlContent.contains("<html") ||
htmlContent.contains("<body") || htmlContent.contains("<p"),
+ "HTML handler should produce HTML markup, got: " +
htmlContent);
+
+ // Parse the same file requesting plain text — expect no HTML tags
+ FetchAndParseReply textReply =
blockingStub.fetchAndParse(FetchAndParseRequest.newBuilder()
+ .setFetcherId(fetcherId)
+ .setFetchKey("sample.html")
+ .setParseContextJson("{\"basic-content-handler-factory\":
{\"type\": \"TEXT\"}}")
+ .build());
+
+ log.info("Text parse status: {}", textReply.getStatus());
+ Assertions.assertEquals("PARSE_SUCCESS", textReply.getStatus(),
+ "Parse should succeed with TEXT handler type");
+
+ String textContent =
textReply.getFieldsMap().get("X-TIKA:content");
+ Assertions.assertNotNull(textContent, "Content should be present
in text response");
+ log.info("Text content (first 200 chars): {}",
textContent.substring(0, Math.min(200, textContent.length())));
+ Assertions.assertFalse(
+ textContent.contains("<html") ||
textContent.contains("<body"),
+ "TEXT handler should not produce HTML tags, got: " +
textContent);
+
+ Assertions.assertNotEquals(htmlContent, textContent,
+ "HTML and TEXT outputs should differ for the same
document");
+
+ } finally {
+ channel.shutdown();
+ try {
+ if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
+ channel.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ channel.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
diff --git
a/tika-e2e-tests/tika-grpc/src/test/resources/tika-config-ignite-handlertype.json
b/tika-e2e-tests/tika-grpc/src/test/resources/tika-config-ignite-handlertype.json
new file mode 100644
index 0000000000..48798bd0eb
--- /dev/null
+++
b/tika-e2e-tests/tika-grpc/src/test/resources/tika-config-ignite-handlertype.json
@@ -0,0 +1,34 @@
+{
+ "plugin-roots": ["/var/cache/tika/plugins"],
+ "pipes": {
+ "numClients": 1,
+ "configStoreType": "ignite",
+ "configStoreParams": "{\"tableName\": \"tika_handlertype_test\",
\"igniteInstanceName\": \"TikaHandlerTypeTest\", \"replicas\": 1,
\"partitions\": 10, \"autoClose\": true}",
+ "forkedJvmArgs": [
+ "--add-opens=java.base/jdk.internal.access=ALL-UNNAMED",
+ "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+ "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
+ "--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED",
+ "--add-opens=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED",
+
"--add-opens=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED",
+ "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED",
+ "--add-opens=java.base/java.io=ALL-UNNAMED",
+ "--add-opens=java.base/java.nio=ALL-UNNAMED",
+ "--add-opens=java.base/java.net=ALL-UNNAMED",
+ "--add-opens=java.base/java.util=ALL-UNNAMED",
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
+ "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
+ "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
+ "--add-opens=java.base/java.math=ALL-UNNAMED",
+ "--add-opens=java.sql/java.sql=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
+ "--add-opens=java.base/java.time=ALL-UNNAMED",
+ "--add-opens=java.base/java.text=ALL-UNNAMED",
+ "--add-opens=java.management/sun.management=ALL-UNNAMED",
+ "--add-opens=java.desktop/java.awt.font=ALL-UNNAMED"
+ ]
+ }
+}
diff --git
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 8c6b96a415..c0b2d6b4d6 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -223,6 +223,12 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
if (StringUtils.isNotBlank(additionalFetchConfigJson)) {
parseContext.setJsonConfig(request.getFetcherId(),
additionalFetchConfigJson);
}
+ String parseContextJson = request.getParseContextJson();
+ if (StringUtils.isNotBlank(parseContextJson)) {
+ com.fasterxml.jackson.databind.JsonNode contextNode =
OBJECT_MAPPER.readTree(parseContextJson);
+ contextNode.fields().forEachRemaining(entry ->
+ parseContext.setJsonConfig(entry.getKey(),
entry.getValue().toString()));
+ }
PipesResult pipesResult = pipesClient.process(new
FetchEmitTuple(request.getFetchKey(), new
FetchKey(fetcher.getExtensionConfig().id(), request.getFetchKey()),
new EmitKey(), tikaMetadata, parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
FetchAndParseReply.Builder fetchReplyBuilder =
diff --git a/tika-grpc/src/main/proto/tika.proto
b/tika-grpc/src/main/proto/tika.proto
index 70fdf2f0af..0a64f37115 100644
--- a/tika-grpc/src/main/proto/tika.proto
+++ b/tika-grpc/src/main/proto/tika.proto
@@ -100,6 +100,11 @@ message FetchAndParseRequest {
string additional_fetch_config_json = 3;
// The ID of the emitter to use (optional). If not provided, no emitter will
be used.
string emitter_id = 4;
+ // Optional JSON object to configure the ParseContext for this request,
overriding server defaults.
+ // Keys are parse-context component names; values are their JSON configs.
+ // Example: {"basic-content-handler-factory": {"type": "HTML"},
"timeout-limits": {"progressTimeoutMillis": 30000}}
+ // See the parse-context.idx component registry for available component
names.
+ string parse_context_json = 5;
}
message FetchAndParseReply {