Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62847383
  
    --- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
    @@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
         * @throws IOException
         */
        @Override
    -   public void open(InputSplit ignored) throws IOException {
    +   public void open(InputSplit inputSplit) throws IOException {
    +           hasNext = true;
                try {
    -                   establishConnection();
    -                   statement = dbConn.createStatement(resultSetType, 
resultSetConcurrency);
    -                   resultSet = statement.executeQuery(query);
    +                   if (inputSplit != null && parameterValues != null) {
    +                           for (int i = 0; i < 
parameterValues[inputSplit.getSplitNumber()].length; i++) {
    +                                   Object param = 
parameterValues[inputSplit.getSplitNumber()][i];
    +                                   if (param instanceof String) {
    +                                           statement.setString(i + 1, 
(String) param);
    +                                   } else if (param instanceof Long) {
    +                                           statement.setLong(i + 1, (Long) 
param);
    +                                   } else if (param instanceof Integer) {
    +                                           statement.setInt(i + 1, 
(Integer) param);
    +                                   } else if (param instanceof Double) {
    +                                           statement.setDouble(i + 1, 
(Double) param);
    +                                   } else if (param instanceof Boolean) {
    +                                           statement.setBoolean(i + 1, 
(Boolean) param);
    +                                   } else if (param instanceof Float) {
    +                                           statement.setFloat(i + 1, 
(Float) param);
    +                                   } else if (param instanceof BigDecimal) 
{
    +                                           statement.setBigDecimal(i + 1, 
(BigDecimal) param);
    +                                   } else if (param instanceof Byte) {
    +                                           statement.setByte(i + 1, (Byte) 
param);
    +                                   } else if (param instanceof Short) {
    +                                           statement.setShort(i + 1, 
(Short) param);
    +                                   } else if (param instanceof Date) {
    +                                           statement.setDate(i + 1, (Date) 
param);
    +                                   } else if (param instanceof Time) {
    +                                           statement.setTime(i + 1, (Time) 
param);
    +                                   } else if (param instanceof Timestamp) {
    +                                           statement.setTimestamp(i + 1, 
(Timestamp) param);
    +                                   } else if (param instanceof Array) {
    +                                           statement.setArray(i + 1, 
(Array) param);
    +                                   } else {
    +                                           //extends with other types if 
needed
    +                                           throw new 
IllegalArgumentException("open() failed. Parameter " + i + " of type " + 
param.getClass() + " is not handled (yet)." );
    +                                   }
    +                           }
    +                           if (LOG.isDebugEnabled()) {
    +                                   LOG.debug(String.format("Executing '%s' 
with parameters %s", queryTemplate, 
Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
    +                           }
    +                   }
    +                   resultSet = statement.executeQuery();
                } catch (SQLException se) {
                        close();
    --- End diff --
    
    I did that in the previous version of this PR 
(https://github.com/apache/flink/pull/1885) but @zentol told me to leave it 
("this is not guaranteed, so please add them back"). From my check @zentol was 
right..isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to