This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6e0f7e545f0 CAMEL-18124: camel-stream - Add readLine option to control
read mode
6e0f7e545f0 is described below
commit 6e0f7e545f03c57c40a21dd8a3549a0678b23e5c
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed May 18 19:13:38 2022 +0200
CAMEL-18124: camel-stream - Add readLine option to control read mode
---
.../component/stream/StreamEndpointConfigurer.java | 6 ++
.../component/stream/StreamEndpointUriFactory.java | 3 +-
.../org/apache/camel/component/stream/stream.json | 1 +
.../camel/component/stream/StreamConsumer.java | 105 +++++++++++++++++++--
.../camel/component/stream/StreamEndpoint.java | 14 +++
5 files changed, 122 insertions(+), 7 deletions(-)
diff --git
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
index 1ba726b3524..5a937b7daa0 100644
---
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
+++
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
@@ -51,6 +51,8 @@ public class StreamEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "promptDelay": target.setPromptDelay(property(camelContext,
long.class, value)); return true;
case "promptmessage":
case "promptMessage": target.setPromptMessage(property(camelContext,
java.lang.String.class, value)); return true;
+ case "readline":
+ case "readLine": target.setReadLine(property(camelContext,
boolean.class, value)); return true;
case "readtimeout":
case "readTimeout": target.setReadTimeout(property(camelContext,
int.class, value)); return true;
case "retry": target.setRetry(property(camelContext, boolean.class,
value)); return true;
@@ -95,6 +97,8 @@ public class StreamEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "promptDelay": return long.class;
case "promptmessage":
case "promptMessage": return java.lang.String.class;
+ case "readline":
+ case "readLine": return boolean.class;
case "readtimeout":
case "readTimeout": return int.class;
case "retry": return boolean.class;
@@ -140,6 +144,8 @@ public class StreamEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "promptDelay": return target.getPromptDelay();
case "promptmessage":
case "promptMessage": return target.getPromptMessage();
+ case "readline":
+ case "readLine": return target.isReadLine();
case "readtimeout":
case "readTimeout": return target.getReadTimeout();
case "retry": return target.isRetry();
diff --git
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
index 220146fe129..d16e85ed5a1 100644
---
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
+++
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class StreamEndpointUriFactory extends
org.apache.camel.support.component
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(21);
+ Set<String> props = new HashSet<>(22);
props.add("appendNewLine");
props.add("autoCloseCount");
props.add("bridgeErrorHandler");
@@ -39,6 +39,7 @@ public class StreamEndpointUriFactory extends
org.apache.camel.support.component
props.add("lazyStartProducer");
props.add("promptDelay");
props.add("promptMessage");
+ props.add("readLine");
props.add("readTimeout");
props.add("retry");
props.add("scanStream");
diff --git
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
index 66aedc4935c..7fa557d5060 100644
---
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
+++
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
@@ -41,6 +41,7 @@
"initialPromptDelay": { "kind": "parameter", "displayName": "Initial
Prompt Delay", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 2000, "description": "Initial delay in
milliseconds before showing the message prompt. This delay occurs only once.
Can be used during system startup to avoid message prompts being written while
other logging is done to the system [...]
"promptDelay": { "kind": "parameter", "displayName": "Prompt Delay",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"description": "Optional delay in milliseconds before showing the message
prompt." },
"promptMessage": { "kind": "parameter", "displayName": "Prompt Message",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Message prompt to use when reading from
stream:in; for example, you could set this to Enter a command:" },
+ "readLine": { "kind": "parameter", "displayName": "Read Line", "group":
"consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": true, "description": "Whether to read the input stream
in line mode (terminate by line breaks). Setting this to false, will instead
read the entire stream until EOL." },
"retry": { "kind": "parameter", "displayName": "Retry", "group":
"consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "description": "Will retry opening the stream if
it's overwritten, somewhat like tail --retry If reading from files then you
should also enable the fileWatcher option, to make it work reliable." },
"scanStream": { "kind": "parameter", "displayName": "Scan Stream",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "description": "To be used for continuously
reading a stream such as the unix tail command." },
"scanStreamDelay": { "kind": "parameter", "displayName": "Scan Stream
Delay", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "description": "Delay in milliseconds between read attempts
when using scanStream." },
diff --git
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index a88886a9ce1..68e3c87a5bd 100644
---
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -89,7 +89,7 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
// if we scan the stream we are lenient and can wait for the stream to
be available later
if (!endpoint.isScanStream()) {
- initializeStream();
+ initializeStreamLineMode();
}
executor =
endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
@@ -119,7 +119,11 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
@Override
public void run() {
try {
- readFromStream();
+ if (endpoint.isReadLine()) {
+ readFromStreamLineMode();
+ } else {
+ readFromStreamRawMode();
+ }
} catch (InterruptedException e) {
// we are closing down so ignore
} catch (Exception e) {
@@ -127,7 +131,7 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
}
}
- private BufferedReader initializeStream() throws Exception {
+ private BufferedReader initializeStreamLineMode() throws Exception {
// close old stream, before obtaining a new stream
IOHelper.close(inputStreamToClose);
@@ -147,10 +151,90 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
}
}
- private void readFromStream() throws Exception {
+ private InputStream initializeStreamRawMode() throws Exception {
+ // close old stream, before obtaining a new stream
+ IOHelper.close(inputStreamToClose);
+
+ if ("in".equals(uri)) {
+ inputStream = System.in;
+ inputStreamToClose = null;
+ } else if ("file".equals(uri)) {
+ inputStream = resolveStreamFromFile();
+ inputStreamToClose = inputStream;
+ }
+
+ return inputStream;
+ }
+
+ private void readFromStreamRawMode() throws Exception {
+ long index = 0;
+ InputStream is = initializeStreamRawMode();
+
+ if (endpoint.isScanStream()) {
+ // repeat scanning from stream
+ while (isRunAllowed()) {
+
+ byte[] data = null;
+ try {
+ data = is.readAllBytes();
+ } catch (IOException e) {
+ // ignore
+ }
+ boolean eos = data == null || data.length == 0;
+
+ if (isRunAllowed() && endpoint.isRetry()) {
+ boolean reOpen = true;
+ if (endpoint.isFileWatcher()) {
+ reOpen = watchFileChanged;
+ }
+ if (reOpen) {
+ LOG.debug("File: {} changed/rollover, re-reading file
from beginning", file);
+ is = initializeStreamRawMode();
+ // we have re-initialized the stream so lower changed
flag
+ if (endpoint.isFileWatcher()) {
+ watchFileChanged = false;
+ }
+ } else {
+ LOG.trace("File: {} not changed since last read",
file);
+ }
+ }
+
+ // sleep only if there is no input
+ if (eos) {
+ try {
+ Thread.sleep(endpoint.getScanStreamDelay());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ } else {
+ // regular read stream once until end of stream
+ boolean eos = false;
+ byte[] data = null;
+ while (!eos && isRunAllowed()) {
+ if (endpoint.getPromptMessage() != null) {
+ doPromptMessage();
+ }
+
+ try {
+ data = is.readAllBytes();
+ } catch (IOException e) {
+ // ignore
+ }
+ eos = data == null || data.length == 0;
+ if (!eos) {
+ processRaw(data, index);
+ }
+ }
+ }
+ }
+
+ private void readFromStreamLineMode() throws Exception {
long index = 0;
String line;
- BufferedReader br = initializeStream();
+ BufferedReader br = initializeStreamLineMode();
if (endpoint.isScanStream()) {
// repeat scanning from stream
@@ -171,7 +255,7 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
}
if (reOpen) {
LOG.debug("File: {} changed/rollover, re-reading file
from beginning", file);
- br = initializeStream();
+ br = initializeStreamLineMode();
// we have re-initialized the stream so lower changed
flag
if (endpoint.isFileWatcher()) {
watchFileChanged = false;
@@ -254,6 +338,15 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
return index;
}
+ /**
+ * Strategy method for processing the data
+ */
+ protected synchronized long processRaw(byte[] body, long index) throws
Exception {
+ Exchange exchange = createExchange(body, index++, true);
+ getProcessor().process(exchange);
+ return index;
+ }
+
/**
* Strategy method for prompting the prompt message
*/
diff --git
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
index 56d82cd34ea..8ca1b54341d 100644
---
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
+++
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
@@ -69,6 +69,8 @@ public class StreamEndpoint extends DefaultEndpoint {
private long initialPromptDelay = 2000;
@UriParam(label = "consumer")
private int groupLines;
+ @UriParam(label = "consumer", defaultValue = "true")
+ private boolean readLine = true;
@UriParam(label = "producer", defaultValue = "true")
private boolean appendNewLine = true;
@UriParam(label = "producer")
@@ -262,6 +264,18 @@ public class StreamEndpoint extends DefaultEndpoint {
this.groupLines = groupLines;
}
+ public boolean isReadLine() {
+ return readLine;
+ }
+
+ /**
+ * Whether to read the input stream in line mode (terminate by line
breaks). Setting this to false, will instead
+ * read the entire stream until EOL.
+ */
+ public void setReadLine(boolean readLine) {
+ this.readLine = readLine;
+ }
+
public int getAutoCloseCount() {
return autoCloseCount;
}