This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e0f7e545f0 CAMEL-18124: camel-stream - Add readLine option to control 
read mode
6e0f7e545f0 is described below

commit 6e0f7e545f03c57c40a21dd8a3549a0678b23e5c
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed May 18 19:13:38 2022 +0200

    CAMEL-18124: camel-stream - Add readLine option to control read mode
---
 .../component/stream/StreamEndpointConfigurer.java |   6 ++
 .../component/stream/StreamEndpointUriFactory.java |   3 +-
 .../org/apache/camel/component/stream/stream.json  |   1 +
 .../camel/component/stream/StreamConsumer.java     | 105 +++++++++++++++++++--
 .../camel/component/stream/StreamEndpoint.java     |  14 +++
 5 files changed, 122 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
index 1ba726b3524..5a937b7daa0 100644
--- 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
+++ 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
@@ -51,6 +51,8 @@ public class StreamEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "promptDelay": target.setPromptDelay(property(camelContext, 
long.class, value)); return true;
         case "promptmessage":
         case "promptMessage": target.setPromptMessage(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "readline":
+        case "readLine": target.setReadLine(property(camelContext, 
boolean.class, value)); return true;
         case "readtimeout":
         case "readTimeout": target.setReadTimeout(property(camelContext, 
int.class, value)); return true;
         case "retry": target.setRetry(property(camelContext, boolean.class, 
value)); return true;
@@ -95,6 +97,8 @@ public class StreamEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "promptDelay": return long.class;
         case "promptmessage":
         case "promptMessage": return java.lang.String.class;
+        case "readline":
+        case "readLine": return boolean.class;
         case "readtimeout":
         case "readTimeout": return int.class;
         case "retry": return boolean.class;
@@ -140,6 +144,8 @@ public class StreamEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "promptDelay": return target.getPromptDelay();
         case "promptmessage":
         case "promptMessage": return target.getPromptMessage();
+        case "readline":
+        case "readLine": return target.isReadLine();
         case "readtimeout":
         case "readTimeout": return target.getReadTimeout();
         case "retry": return target.isRetry();
diff --git 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
index 220146fe129..d16e85ed5a1 100644
--- 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
+++ 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class StreamEndpointUriFactory extends 
org.apache.camel.support.component
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(21);
+        Set<String> props = new HashSet<>(22);
         props.add("appendNewLine");
         props.add("autoCloseCount");
         props.add("bridgeErrorHandler");
@@ -39,6 +39,7 @@ public class StreamEndpointUriFactory extends 
org.apache.camel.support.component
         props.add("lazyStartProducer");
         props.add("promptDelay");
         props.add("promptMessage");
+        props.add("readLine");
         props.add("readTimeout");
         props.add("retry");
         props.add("scanStream");
diff --git 
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
 
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
index 66aedc4935c..7fa557d5060 100644
--- 
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
+++ 
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
@@ -41,6 +41,7 @@
     "initialPromptDelay": { "kind": "parameter", "displayName": "Initial 
Prompt Delay", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 2000, "description": "Initial delay in 
milliseconds before showing the message prompt. This delay occurs only once. 
Can be used during system startup to avoid message prompts being written while 
other logging is done to the system [...]
     "promptDelay": { "kind": "parameter", "displayName": "Prompt Delay", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Optional delay in milliseconds before showing the message 
prompt." },
     "promptMessage": { "kind": "parameter", "displayName": "Prompt Message", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Message prompt to use when reading from 
stream:in; for example, you could set this to Enter a command:" },
+    "readLine": { "kind": "parameter", "displayName": "Read Line", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "description": "Whether to read the input stream 
in line mode (terminate by line breaks). Setting this to false, will instead 
read the entire stream until EOL." },
     "retry": { "kind": "parameter", "displayName": "Retry", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "description": "Will retry opening the stream if 
it's overwritten, somewhat like tail --retry If reading from files then you 
should also enable the fileWatcher option, to make it work reliable." },
     "scanStream": { "kind": "parameter", "displayName": "Scan Stream", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "description": "To be used for continuously 
reading a stream such as the unix tail command." },
     "scanStreamDelay": { "kind": "parameter", "displayName": "Scan Stream 
Delay", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "description": "Delay in milliseconds between read attempts 
when using scanStream." },
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index a88886a9ce1..68e3c87a5bd 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -89,7 +89,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
 
         // if we scan the stream we are lenient and can wait for the stream to 
be available later
         if (!endpoint.isScanStream()) {
-            initializeStream();
+            initializeStreamLineMode();
         }
 
         executor = 
endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
@@ -119,7 +119,11 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
     @Override
     public void run() {
         try {
-            readFromStream();
+            if (endpoint.isReadLine()) {
+                readFromStreamLineMode();
+            } else {
+                readFromStreamRawMode();
+            }
         } catch (InterruptedException e) {
             // we are closing down so ignore
         } catch (Exception e) {
@@ -127,7 +131,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         }
     }
 
-    private BufferedReader initializeStream() throws Exception {
+    private BufferedReader initializeStreamLineMode() throws Exception {
         // close old stream, before obtaining a new stream
         IOHelper.close(inputStreamToClose);
 
@@ -147,10 +151,90 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         }
     }
 
-    private void readFromStream() throws Exception {
+    private InputStream initializeStreamRawMode() throws Exception {
+        // close old stream, before obtaining a new stream
+        IOHelper.close(inputStreamToClose);
+
+        if ("in".equals(uri)) {
+            inputStream = System.in;
+            inputStreamToClose = null;
+        } else if ("file".equals(uri)) {
+            inputStream = resolveStreamFromFile();
+            inputStreamToClose = inputStream;
+        }
+
+        return inputStream;
+    }
+
+    private void readFromStreamRawMode() throws Exception {
+        long index = 0;
+        InputStream is = initializeStreamRawMode();
+
+        if (endpoint.isScanStream()) {
+            // repeat scanning from stream
+            while (isRunAllowed()) {
+
+                byte[] data = null;
+                try {
+                    data = is.readAllBytes();
+                } catch (IOException e) {
+                    // ignore
+                }
+                boolean eos = data == null || data.length == 0;
+
+                if (isRunAllowed() && endpoint.isRetry()) {
+                    boolean reOpen = true;
+                    if (endpoint.isFileWatcher()) {
+                        reOpen = watchFileChanged;
+                    }
+                    if (reOpen) {
+                        LOG.debug("File: {} changed/rollover, re-reading file 
from beginning", file);
+                        is = initializeStreamRawMode();
+                        // we have re-initialized the stream so lower changed 
flag
+                        if (endpoint.isFileWatcher()) {
+                            watchFileChanged = false;
+                        }
+                    } else {
+                        LOG.trace("File: {} not changed since last read", 
file);
+                    }
+                }
+
+                // sleep only if there is no input
+                if (eos) {
+                    try {
+                        Thread.sleep(endpoint.getScanStreamDelay());
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                }
+            }
+        } else {
+            // regular read stream once until end of stream
+            boolean eos = false;
+            byte[] data = null;
+            while (!eos && isRunAllowed()) {
+                if (endpoint.getPromptMessage() != null) {
+                    doPromptMessage();
+                }
+
+                try {
+                    data = is.readAllBytes();
+                } catch (IOException e) {
+                    // ignore
+                }
+                eos = data == null || data.length == 0;
+                if (!eos) {
+                    processRaw(data, index);
+                }
+            }
+        }
+    }
+
+    private void readFromStreamLineMode() throws Exception {
         long index = 0;
         String line;
-        BufferedReader br = initializeStream();
+        BufferedReader br = initializeStreamLineMode();
 
         if (endpoint.isScanStream()) {
             // repeat scanning from stream
@@ -171,7 +255,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
                     }
                     if (reOpen) {
                         LOG.debug("File: {} changed/rollover, re-reading file 
from beginning", file);
-                        br = initializeStream();
+                        br = initializeStreamLineMode();
                         // we have re-initialized the stream so lower changed 
flag
                         if (endpoint.isFileWatcher()) {
                             watchFileChanged = false;
@@ -254,6 +338,15 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         return index;
     }
 
+    /**
+     * Strategy method for processing the data
+     */
+    protected synchronized long processRaw(byte[] body, long index) throws 
Exception {
+        Exchange exchange = createExchange(body, index++, true);
+        getProcessor().process(exchange);
+        return index;
+    }
+
     /**
      * Strategy method for prompting the prompt message
      */
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
index 56d82cd34ea..8ca1b54341d 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
@@ -69,6 +69,8 @@ public class StreamEndpoint extends DefaultEndpoint {
     private long initialPromptDelay = 2000;
     @UriParam(label = "consumer")
     private int groupLines;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean readLine = true;
     @UriParam(label = "producer", defaultValue = "true")
     private boolean appendNewLine = true;
     @UriParam(label = "producer")
@@ -262,6 +264,18 @@ public class StreamEndpoint extends DefaultEndpoint {
         this.groupLines = groupLines;
     }
 
+    public boolean isReadLine() {
+        return readLine;
+    }
+
+    /**
+     * Whether to read the input stream in line mode (terminate by line 
breaks). Setting this to false, will instead
+     * read the entire stream until EOL.
+     */
+    public void setReadLine(boolean readLine) {
+        this.readLine = readLine;
+    }
+
     public int getAutoCloseCount() {
         return autoCloseCount;
     }

Reply via email to