Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139711376 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** + * Assigns ingestion time timestamps and watermarks. + */ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks<Row> getAssigner() { + return this.timestampAssigner; + } + + /** + * Checks that the provided row time attribute is valid, determines its position in the schema, + * and adjusts the return type. + * + * @param rowtime The attribute to check. + */ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; + for (int i = 0; i < fieldNames.length; i++) { + if (fieldNames[i].equals(rowtime)) { + if (fieldTypes[i] != Types.LONG) { + throw new IllegalArgumentException("Specified rowtime field must be of type BIGINT. " + + "Available fields: " + toSchemaString(fieldNames, fieldTypes)); + } + this.rowtimeFieldPos = i; + break; + } + } + if (this.rowtimeFieldPos < 0) { + throw new IllegalArgumentException("Specified rowtime field must be present in data. " + + "Available fields: " + toSchemaString(fieldNames, fieldTypes)); + } + this.rowTimeAttribute = rowtime; + + // adjust result type by removing rowtime field (will be added later) + String[] newNames = new String[fieldNames.length - 1]; + TypeInformation[] newTypes = new TypeInformation[fieldTypes.length - 1]; + for (int i = 0; i < rowtimeFieldPos; i++) { + newNames[i] = fieldNames[i]; + newTypes[i] = fieldTypes[i]; + } + for (int i = rowtimeFieldPos + 1; i < fieldNames.length; i++) { + newNames[i - 1] = fieldNames[i]; + newTypes[i - 1] = fieldTypes[i]; + } + this.typeInfo = new RowTypeInfo(newTypes, newNames); + } + + /** + * Util method to create a schema description. + * + * @param fieldNames The names of the fields. + * @param fieldTypes The types of the fields. + * @return A string describing the schema of the given field information. + */ + private String toSchemaString(String[] fieldNames, TypeInformation[] fieldTypes) { + Preconditions.checkArgument(fieldNames.length == fieldTypes.length); + + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < fieldNames.length - 1; i++) { + sb.append(fieldNames[i]); + sb.append(": "); + if (fieldTypes[i] == Types.BOOLEAN) { + sb.append("BOOLEAN"); + } else if (fieldTypes[i] == Types.BYTE) { + sb.append("TINYINT"); + } else if (fieldTypes[i] == Types.SHORT) { + sb.append("SMALLINT"); + } else if (fieldTypes[i] == Types.INT) { + sb.append("INTEGER"); + } else if (fieldTypes[i] == Types.LONG) { + sb.append("BIGINT"); + } else if (fieldTypes[i] == Types.FLOAT) { + sb.append("FLOAT"); + } else if (fieldTypes[i] == Types.DOUBLE) { + sb.append("DOUBLE"); + } else if (fieldTypes[i] == Types.STRING) { + sb.append("VARCHAR"); + } else if (fieldTypes[i] == Types.DECIMAL) { + sb.append("DECIMAL"); + } else if (fieldTypes[i] == Types.SQL_DATE) { + sb.append("DATE"); + } else if (fieldTypes[i] == Types.SQL_TIME) { + sb.append("TIME"); + } else if (fieldTypes[i] == Types.SQL_TIMESTAMP) { + sb.append("TIMESTAMP"); + } else { + sb.append(fieldTypes[i].getTypeClass().getSimpleName()); + } + sb.append(", "); + } + sb.delete(sb.length() - 2, sb.length()); + sb.append("]"); + return sb.toString(); + } + + /** + * Assigns an existing field as timestamp and generates bounded out-of-order watermarks. + */ + public static class RowFieldWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private final int timeField; + private final long delayMs; + private long maxTime = Long.MIN_VALUE; + + private RowFieldWatermarkAssigner(int timeFieldPos, long delayMs) { + Preconditions.checkArgument(delayMs >= 0, "Watermark delay must be positive."); + this.timeField = timeFieldPos; + this.delayMs = delayMs; + } + + @Override + public long extractTimestamp(Row row, long previousElementTimestamp) { + long t = (long) row.getField(timeField); + if (t > maxTime) { + maxTime = t; + } + return t; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(maxTime - delayMs); + } } + + /** + * Removes a field from a Row. + */ + public static class FieldRemover implements MapFunction<Row, Row> { + + private int fieldToRemove; + + private FieldRemover(int fieldToRemove) { + this.fieldToRemove = fieldToRemove; + } + + @Override + public Row map(Row value) throws Exception { + + Row out = new Row(value.getArity()); --- End diff -- Yes, you're right. Thanks!
---