This is an automated email from the ASF dual-hosted git repository.
jeremyross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 7fbc59c CAMEL-16206: camel-sql: Preserve message body (#5086)
7fbc59c is described below
commit 7fbc59c4abde655cda26c54be6605e4c469a7c6a
Author: Jeremy Ross <[email protected]>
AuthorDate: Mon Feb 15 15:52:01 2021 -0600
CAMEL-16206: camel-sql: Preserve message body (#5086)
CAMEL-16206: camel-sql: Preserve message body
when CamelSqlRetrieveGeneratedKeys == true, or when there's nothing else
useful to put in the body.
---
.../apache/camel/component/sql/SqlProducer.java | 226 ++++++++-------------
.../camel/component/sql/SqlGeneratedKeysTest.java | 5 +-
2 files changed, 90 insertions(+), 141 deletions(-)
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index 2a5b36f..0890cce 100644
---
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -24,7 +24,6 @@ import java.sql.Statement;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
@@ -105,75 +104,54 @@ public class SqlProducer extends DefaultProducer {
final Boolean shouldRetrieveGeneratedKeys
=
exchange.getIn().getHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, false,
Boolean.class);
- PreparedStatementCreator statementCreator = new
PreparedStatementCreator() {
- @Override
- public PreparedStatement createPreparedStatement(Connection con)
throws SQLException {
- if (!shouldRetrieveGeneratedKeys) {
- return con.prepareStatement(preparedQuery);
+ PreparedStatementCreator statementCreator = con -> {
+ if (!shouldRetrieveGeneratedKeys) {
+ return con.prepareStatement(preparedQuery);
+ } else {
+ Object expectedGeneratedColumns =
exchange.getIn().getHeader(SqlConstants.SQL_GENERATED_COLUMNS);
+ if (expectedGeneratedColumns == null) {
+ return con.prepareStatement(preparedQuery,
Statement.RETURN_GENERATED_KEYS);
+ } else if (expectedGeneratedColumns instanceof String[]) {
+ return con.prepareStatement(preparedQuery, (String[])
expectedGeneratedColumns);
+ } else if (expectedGeneratedColumns instanceof int[]) {
+ return con.prepareStatement(preparedQuery, (int[])
expectedGeneratedColumns);
} else {
- Object expectedGeneratedColumns =
exchange.getIn().getHeader(SqlConstants.SQL_GENERATED_COLUMNS);
- if (expectedGeneratedColumns == null) {
- return con.prepareStatement(preparedQuery,
Statement.RETURN_GENERATED_KEYS);
- } else if (expectedGeneratedColumns instanceof String[]) {
- return con.prepareStatement(preparedQuery, (String[])
expectedGeneratedColumns);
- } else if (expectedGeneratedColumns instanceof int[]) {
- return con.prepareStatement(preparedQuery, (int[])
expectedGeneratedColumns);
- } else {
- throw new IllegalArgumentException(
- "Header specifying expected returning columns
isn't an instance of String[] or int[] but "
- +
expectedGeneratedColumns.getClass());
- }
+ throw new IllegalArgumentException(
+ "Header specifying expected returning columns
isn't an instance of String[] or int[] but "
+ +
expectedGeneratedColumns.getClass());
}
}
};
- // special for processing stream list (batch not supported)
- SqlOutputType outputType = getEndpoint().getOutputType();
- if (outputType == SqlOutputType.StreamList) {
- processStreamList(exchange, statementCreator, sql, preparedQuery);
- return;
+ Object data;
+ if (getEndpoint().getOutputType() == SqlOutputType.StreamList) {
+ data = processStreamList(exchange, statementCreator, sql,
preparedQuery);
+ } else {
+ data = processInternal(exchange, statementCreator, sql,
preparedQuery, shouldRetrieveGeneratedKeys);
}
+ exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+ if (getEndpoint().isNoop() || getEndpoint().getOutputHeader() != null
|| data == null) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ }
+ if (getEndpoint().getOutputHeader() != null) {
+ exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
+ } else if (data != null && !getEndpoint().isNoop()) {
+ exchange.getOut().setBody(data);
+ }
+ }
+ private Object processInternal(
+ Exchange exchange, PreparedStatementCreator statementCreator,
+ String sql, String preparedQuery, Boolean
shouldRetrieveGeneratedKeys) {
LOG.trace("jdbcTemplate.execute: {}", preparedQuery);
- jdbcTemplate.execute(statementCreator, new
PreparedStatementCallback<Map<?, ?>>() {
- public Map<?, ?> doInPreparedStatement(PreparedStatement ps)
throws SQLException {
+ return jdbcTemplate.execute(statementCreator, new
PreparedStatementCallback<Object>() {
+ public Object doInPreparedStatement(PreparedStatement ps) throws
SQLException {
+ Object data = null;
ResultSet rs = null;
try {
- int expected = parametersCount > 0 ? parametersCount :
ps.getParameterMetaData().getParameterCount();
-
- // only populate if really needed
- if (alwaysPopulateStatement || expected > 0) {
- // transfer incoming message body data to prepared
statement parameters, if necessary
- if (batch) {
- Iterator<?> iterator;
- if (useMessageBodyForSql) {
- iterator =
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
- } else {
- iterator =
exchange.getIn().getBody(Iterator.class);
- }
- while (iterator != null && iterator.hasNext()) {
- Object value = iterator.next();
- Iterator<?> i =
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
- exchange, value);
-
sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
- ps.addBatch();
- }
- } else {
- Object value;
- if (useMessageBodyForSql) {
- value =
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
- } else {
- value = exchange.getIn().getBody();
- }
- Iterator<?> i =
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
- exchange, value);
- sqlPrepareStatementStrategy.populateStatement(ps,
i, expected);
- }
- }
-
+ populateStatement(ps, exchange, sql, preparedQuery);
boolean isResultSet = false;
- // execute the prepared statement and populate the
outgoing message
if (batch) {
int[] updateCounts = ps.executeBatch();
int total = 0;
@@ -184,59 +162,31 @@ public class SqlProducer extends DefaultProducer {
} else {
isResultSet = ps.execute();
if (isResultSet) {
- // preserve headers first, so we can override the
SQL_ROW_COUNT header
-
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
rs = ps.getResultSet();
SqlOutputType outputType =
getEndpoint().getOutputType();
LOG.trace("Got result list from query: {},
outputType={}", rs, outputType);
+
+ int rowCount = 0;
if (outputType == SqlOutputType.SelectList) {
- List<?> data = getEndpoint().queryForList(rs,
true);
- // for noop=true we still want to enrich with
the row count header
- if (getEndpoint().isNoop()) {
-
exchange.getOut().setBody(exchange.getIn().getBody());
- } else if (getEndpoint().getOutputHeader() !=
null) {
-
exchange.getOut().setBody(exchange.getIn().getBody());
-
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
- } else {
- exchange.getOut().setBody(data);
- }
-
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
+ data = getEndpoint().queryForList(rs, true);
+ rowCount = ((List<?>) data).size();
} else if (outputType == SqlOutputType.SelectOne) {
- Object data = getEndpoint().queryForObject(rs);
+ data = getEndpoint().queryForObject(rs);
if (data != null) {
- // for noop=true we still want to enrich
with the row count header
- if (getEndpoint().isNoop()) {
-
exchange.getOut().setBody(exchange.getIn().getBody());
- } else if (getEndpoint().getOutputHeader()
!= null) {
-
exchange.getOut().setBody(exchange.getIn().getBody());
-
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
- } else {
- exchange.getOut().setBody(data);
- }
-
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
- } else {
- if (getEndpoint().isNoop()) {
-
exchange.getOut().setBody(exchange.getIn().getBody());
- } else if (getEndpoint().getOutputHeader()
!= null) {
-
exchange.getOut().setBody(exchange.getIn().getBody());
- }
-
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 0);
+ rowCount = 1;
}
} else {
throw new IllegalArgumentException("Invalid
outputType=" + outputType);
}
+
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, rowCount);
} else {
-
exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+
exchange.getOut().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+
exchange.getOut().setBody(exchange.getIn().getBody());
}
}
if (shouldRetrieveGeneratedKeys) {
- // if no OUT message yet then create one and propagate
headers
- if (!exchange.hasOut()) {
-
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
- }
-
if (isResultSet) {
// we won't return generated keys for SELECT
statements
exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA,
Collections.EMPTY_LIST);
@@ -248,8 +198,7 @@ public class SqlProducer extends DefaultProducer {
}
}
- // data is set on exchange so return null
- return null;
+ return data;
} finally {
closeResultSet(rs);
}
@@ -257,7 +206,7 @@ public class SqlProducer extends DefaultProducer {
});
}
- protected void processStreamList(
+ protected Object processStreamList(
Exchange exchange, PreparedStatementCreator statementCreator,
String sql, String preparedQuery)
throws Exception {
LOG.trace("processStreamList: {}", preparedQuery);
@@ -271,58 +220,20 @@ public class SqlProducer extends DefaultProducer {
try {
con = jdbcTemplate.getDataSource().getConnection();
ps = statementCreator.createPreparedStatement(con);
+ ResultSetIterator iterator = null;
- int expected = parametersCount > 0 ? parametersCount :
ps.getParameterMetaData().getParameterCount();
-
- // only populate if really needed
- if (alwaysPopulateStatement || expected > 0) {
- // transfer incoming message body data to prepared statement
parameters, if necessary
- if (batch) {
- Iterator<?> iterator;
- if (useMessageBodyForSql) {
- iterator =
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
- } else {
- iterator = exchange.getIn().getBody(Iterator.class);
- }
- while (iterator != null && iterator.hasNext()) {
- Object value = iterator.next();
- Iterator<?> i =
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
- exchange, value);
- sqlPrepareStatementStrategy.populateStatement(ps, i,
expected);
- ps.addBatch();
- }
- } else {
- Object value;
- if (useMessageBodyForSql) {
- value =
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
- } else {
- value = exchange.getIn().getBody();
- }
- Iterator<?> i
- =
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery,
expected, exchange, value);
- sqlPrepareStatementStrategy.populateStatement(ps, i,
expected);
- }
- }
+ populateStatement(ps, exchange, sql, preparedQuery);
boolean isResultSet = ps.execute();
if (isResultSet) {
rs = ps.getResultSet();
- ResultSetIterator iterator =
getEndpoint().queryForStreamList(con, ps, rs);
- //pass through all headers
-
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+ iterator = getEndpoint().queryForStreamList(con, ps, rs);
- if (getEndpoint().isNoop()) {
- exchange.getOut().setBody(exchange.getIn().getBody());
- } else if (getEndpoint().getOutputHeader() != null) {
- exchange.getOut().setBody(exchange.getIn().getBody());
-
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), iterator);
- } else {
- exchange.getOut().setBody(iterator);
- }
// we do not know the row count so we cannot set a ROW_COUNT
header
// defer closing the iterator when the exchange is complete
exchange.adapt(ExtendedExchange.class).addOnCompletion(new
ResultSetIteratorCompletion(iterator));
}
+ return iterator;
} catch (Exception e) {
// in case of exception then close all this before rethrow
closeConnection(con);
@@ -332,6 +243,41 @@ public class SqlProducer extends DefaultProducer {
}
}
+ private void populateStatement(PreparedStatement ps, Exchange exchange,
String sql, String preparedQuery)
+ throws SQLException {
+ int expected = parametersCount > 0 ? parametersCount :
ps.getParameterMetaData().getParameterCount();
+
+ // only populate if really needed
+ if (alwaysPopulateStatement || expected > 0) {
+ // transfer incoming message body data to prepared statement
parameters, if necessary
+ if (batch) {
+ Iterator<?> iterator;
+ if (useMessageBodyForSql) {
+ iterator =
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
+ } else {
+ iterator = exchange.getIn().getBody(Iterator.class);
+ }
+ while (iterator != null && iterator.hasNext()) {
+ Object value = iterator.next();
+ Iterator<?> i =
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
+ exchange, value);
+ sqlPrepareStatementStrategy.populateStatement(ps, i,
expected);
+ ps.addBatch();
+ }
+ } else {
+ Object value;
+ if (useMessageBodyForSql) {
+ value =
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
+ } else {
+ value = exchange.getIn().getBody();
+ }
+ Iterator<?> i =
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
+ exchange, value);
+ sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+ }
+ }
+ }
+
public void setParametersCount(int parametersCount) {
this.parametersCount = parametersCount;
}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
index 5e96fd5..1c645a4 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
@@ -33,6 +33,7 @@ import
org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SqlGeneratedKeysTest extends CamelTestSupport {
@@ -79,8 +80,9 @@ public class SqlGeneratedKeysTest extends CamelTestSupport {
// first we create our exchange using the endpoint
Endpoint endpoint = context.getEndpoint("direct:insert");
+ Object body = new Object[] { "project x", "ASF", "new project" };
Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(new Object[] { "project x", "ASF", "new
project" });
+ exchange.getIn().setBody(body);
exchange.getIn().setHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS,
true);
// now we send the exchange to the endpoint, and receives the response
from Camel
@@ -90,6 +92,7 @@ public class SqlGeneratedKeysTest extends CamelTestSupport {
assertNotNull(out);
assertNotNull(out.getMessage());
assertNotNull(out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA));
+ assertSame(body, out.getMessage().getBody());
List<Map<String, Object>> generatedKeys =
out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, List.class);
assertNotNull(generatedKeys, "out body could not be converted to a
List - was: "