carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r257740847
########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ########## @@ -510,831 +505,92 @@ public WriteOptions getWriteOptions() { return snapshotRunner; } - @Override - public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { - - LOG.info("Initializing RocksDB keyed state backend."); - - // clear all meta data - kvStateInformation.clear(); - - try { - RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null; - if (restoreState == null || restoreState.isEmpty()) { - createDB(); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, numberOfTransferingThreads); - } - - KeyedStateHandle firstStateHandle = restoreState.iterator().next(); - if (firstStateHandle instanceof IncrementalKeyedStateHandle - || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) { - incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation<>(this); - incrementalRestoreOperation.restore(restoreState); - } else { - RocksDBFullRestoreOperation<K> fullRestoreOperation = new RocksDBFullRestoreOperation<>(this); - fullRestoreOperation.doRestore(restoreState); - } - } - - // it is important that we only create the key builder after the restore, and not before; - // restore operations may reconfigure the key serializer, so accessing the key serializer - // only now we can be certain that the key serializer used in the builder is final. - this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>( - getKeySerializer(), + private RocksDBRestoreOperation<K> getRocksDBRestoreOperation(int keyGroupPrefixBytes, + CloseableRegistry cancelStreamRegistry, + LinkedHashMap<String, StateColumnFamilyHandle> kvStateInformation, + RocksDBNativeMetricMonitor nativeMetricMonitor, + Collection<KeyedStateHandle> restoreState) { + File instanceRocksDBPath = new File(instanceBasePath, DB_INSTANCE_DIR_STRING); + if (restoreState == null || restoreState.isEmpty()) { + return new RocksDBNoneRestoreOperation<>( + keyGroupRange, keyGroupPrefixBytes, - 32); - - initializeSnapshotStrategy(incrementalRestoreOperation); - } catch (Exception ex) { - dispose(); - throw ex; - } - } - - @VisibleForTesting - void initializeSnapshotStrategy(@Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) { - - this.savepointSnapshotStrategy = - new RocksFullSnapshotStrategy<>( - db, - rocksDBResourceGuard, - getKeySerializer(), + numberOfTransferingThreads, + cancelStreamRegistry, + userCodeClassLoader, kvStateInformation, + keySerializerProvider, + instanceBasePath, + instanceRocksDBPath, + dbOptions, + columnOptions, + nativeMetricMonitor); + } + KeyedStateHandle firstStateHandle = restoreState.iterator().next(); + boolean isIncrementalStateHandle = (firstStateHandle instanceof IncrementalKeyedStateHandle) + || (firstStateHandle instanceof IncrementalLocalKeyedStateHandle); + if (isIncrementalStateHandle) { + return new RocksDBIncrementalRestoreOperation<>( + operatorIdentifier, keyGroupRange, keyGroupPrefixBytes, - localRecoveryConfig, + numberOfTransferingThreads, cancelStreamRegistry, - keyGroupCompressionDecorator); - - if (enableIncrementalCheckpointing) { - final UUID backendUID; - final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; - final long lastCompletedCheckpointId; - - if (incrementalRestoreOperation == null) { - backendUID = UUID.randomUUID(); - materializedSstFiles = new TreeMap<>(); - lastCompletedCheckpointId = -1L; - } else { - backendUID = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID()); - materializedSstFiles = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles()); - lastCompletedCheckpointId = incrementalRestoreOperation.getLastCompletedCheckpointId(); - Preconditions.checkState(lastCompletedCheckpointId >= 0L); - } - // TODO eventually we might want to separate savepoint and snapshot strategy, i.e. having 2 strategies. - this.checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>( - db, - rocksDBResourceGuard, - getKeySerializer(), + userCodeClassLoader, kvStateInformation, + keySerializerProvider, + instanceBasePath, + instanceRocksDBPath, + dbOptions, + columnOptions, + nativeMetricMonitor); + } else { + return new RocksDBFullRestoreOperation<>( keyGroupRange, keyGroupPrefixBytes, - localRecoveryConfig, + numberOfTransferingThreads, cancelStreamRegistry, + userCodeClassLoader, + kvStateInformation, + keySerializerProvider, instanceBasePath, - backendUID, - materializedSstFiles, - lastCompletedCheckpointId, - numberOfTransferingThreads); - } else { - this.checkpointSnapshotStrategy = savepointSnapshotStrategy; + instanceRocksDBPath, + dbOptions, + columnOptions, + nativeMetricMonitor); } } @Override - public void notifyCheckpointComplete(long completedCheckpointId) throws Exception { - - if (checkpointSnapshotStrategy != null) { - checkpointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId); - } + public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { - if (savepointSnapshotStrategy != null) { - savepointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId); + if (restoreState == null) { + return; } - } - - private void createDB() throws IOException { - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); - this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); - this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions); - this.defaultColumnFamily = columnFamilyHandles.get(0); - } - - private RocksDB openDB( - String path, - List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, - List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { - - List<ColumnFamilyDescriptor> columnFamilyDescriptors = - new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - - // we add the required descriptor for the default CF in FIRST position, see - // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families - columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions)); - columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - - RocksDB dbRef; + LOG.info("Restoring RocksDB keyed state backend."); try { - dbRef = RocksDB.open( - Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), - columnFamilyDescriptors, - stateColumnFamilyHandles); - } catch (RocksDBException e) { - throw new IOException("Error while opening RocksDB instance.", e); - } - - // requested + default CF - Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), - "Not all requested column family handles have been created"); - - if (this.metricOptions.isEnabled()) { - this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( - dbRef, - metricOptions, - metricGroup - ); - } - - return dbRef; - } - - /** - * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot. - */ - private static final class RocksDBFullRestoreOperation<K> { - - private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; - - /** Current key-groups state handle from which we restore key-groups. */ - private KeyGroupsStateHandle currentKeyGroupsStateHandle; - /** Current input stream we obtained from currentKeyGroupsStateHandle. */ - private FSDataInputStream currentStateHandleInStream; - /** Current data input view that wraps currentStateHandleInStream. */ - private DataInputView currentStateHandleInView; - /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ - private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; - /** The compression decorator that was used for writing the state, as determined by the meta data. */ - private StreamCompressionDecorator keygroupStreamCompressionDecorator; - - private boolean isKeySerializerCompatibilityChecked; - - /** - * Creates a restore operation object for the given state backend instance. - * - * @param rocksDBKeyedStateBackend the state backend into which we restore - */ - RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { - this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); - } - - /** - * Restores all key-groups data that is referenced by the passed state handles. - * - * @param keyedStateHandles List of all key groups state handles that shall be restored. - */ - void doRestore(Collection<KeyedStateHandle> keyedStateHandles) - throws IOException, StateMigrationException, RocksDBException { - - rocksDBKeyedStateBackend.createDB(); - - for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { - if (keyedStateHandle != null) { - - if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + KeyGroupsStateHandle.class + - ", but found: " + keyedStateHandle.getClass()); - } - this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; - restoreKeyGroupsInStateHandle(); - } - } - } - - /** - * Restore one key groups state handle. - */ - private void restoreKeyGroupsInStateHandle() - throws IOException, StateMigrationException, RocksDBException { - try { - currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); - rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); - currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); - restoreKVStateMetaData(); - restoreKVStateData(); - } finally { - if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { - IOUtils.closeQuietly(currentStateHandleInStream); - } - } - } - - /** - * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. - */ - private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { - - // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend, - // deserialization of state happens lazily during runtime; we depend on the fact - // that the new serializer for states could be compatible, and therefore the restore can continue - // without old serializers required to be present. - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); - - serializationProxy.read(currentStateHandleInView); - - if (!isKeySerializerCompatibilityChecked) { - // check for key serializer compatibility; this also reconfigures the - // key serializer to be compatible, if it is required and is possible - TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat = - rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot()); - if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) { - throw new StateMigrationException("The new key serializer must be compatible."); - } - - isKeySerializerCompatibilityChecked = true; - } - - this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? - SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - - List<StateMetaInfoSnapshot> restoredMetaInfos = - serializationProxy.getStateMetaInfoSnapshots(); - currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - - for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) { - - Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn = - rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); - - if (registeredColumn == null) { - byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - - ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( - nameBytes, - rocksDBKeyedStateBackend.columnOptions); - - ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - - // create a meta info for the state on restore; - // this allows us to retain the state in future snapshots even if it wasn't accessed - RegisteredStateMetaInfoBase stateMetaInfo = - RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo); - registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); - rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(), registeredColumn); - - } else { - // TODO with eager state registration in place, check here for serializer migration strategies - } - currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); - } - } - - /** - * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. - */ - private void restoreKVStateData() throws IOException, RocksDBException { - //for all key-groups in the current state handle... - try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) { - for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { - int keyGroup = keyGroupOffset.f0; - - // Check that restored key groups all belong to the backend - Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), - "The key group must belong to the backend"); - - long offset = keyGroupOffset.f1; - //not empty key-group? - if (0L != offset) { - currentStateHandleInStream.seek(offset); - try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { - DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - int kvStateId = compressedKgInputView.readShort(); - ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - //insert all k/v pairs into DB - boolean keyGroupHasMoreKeys = true; - while (keyGroupHasMoreKeys) { - byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - if (hasMetaDataFollowsFlag(key)) { - //clear the signal bit in the key to make it ready for insertion again - clearMetaDataFollowsFlag(key); - writeBatchWrapper.put(handle, key, value); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kvStateId = END_OF_KEY_GROUP_MARK - & compressedKgInputView.readShort(); - if (END_OF_KEY_GROUP_MARK == kvStateId) { - keyGroupHasMoreKeys = false; - } else { - handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - } - } else { - writeBatchWrapper.put(handle, key, value); - } - } - } - } - } - } + RocksDBRestoreOperation<K> restoreOperation = getRocksDBRestoreOperation( + keyGroupPrefixBytes, cancelStreamRegistry, kvStateInformation, nativeMetricMonitor, restoreState); + restoreOperation.restore(restoreState, db); + } catch (Exception ex) { + dispose(); + throw ex; } } - /** - * Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental snapshot. - */ - private static class RocksDBIncrementalRestoreOperation<T> { - - private final RocksDBKeyedStateBackend<T> stateBackend; - private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles; - private UUID restoredBackendUID; - private long lastCompletedCheckpointId; - private boolean isKeySerializerCompatibilityChecked; - - private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { - - this.stateBackend = stateBackend; - this.restoredSstFiles = new TreeMap<>(); - } - - SortedMap<Long, Set<StateHandleID>> getRestoredSstFiles() { - return restoredSstFiles; - } - - UUID getRestoredBackendUID() { - return restoredBackendUID; - } - - long getLastCompletedCheckpointId() { - return lastCompletedCheckpointId; - } - - /** - * Root method that branches for different implementations of {@link KeyedStateHandle}. - */ - void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { - - if (restoreStateHandles.isEmpty()) { - return; - } - - final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next(); - - boolean isRescaling = (restoreStateHandles.size() > 1 || - !Objects.equals(theFirstStateHandle.getKeyGroupRange(), stateBackend.keyGroupRange)); - - if (!isRescaling) { - restoreWithoutRescaling(theFirstStateHandle); - } else { - restoreWithRescaling(restoreStateHandles); - } - } - - /** - * Recovery from a single remote incremental state without rescaling. - */ - void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception { - - IncrementalLocalKeyedStateHandle localKeyedStateHandle; - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; - List<ColumnFamilyDescriptor> columnFamilyDescriptors; - - // Recovery from remote incremental state. - Path temporaryRestoreInstancePath = new Path( - stateBackend.instanceBasePath.getAbsolutePath(), - UUID.randomUUID().toString()); - - try { - if (rawStateHandle instanceof IncrementalKeyedStateHandle) { - - IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; - - // read state data. - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(stateBackend.numberOfTransferingThreads)) { - rocksDBStateDownloader.transferAllStateDataToDirectory( - restoreStateHandle, - temporaryRestoreInstancePath, - stateBackend.cancelStreamRegistry); - } - - stateMetaInfoSnapshots = readMetaData(restoreStateHandle.getMetaStateHandle()); - columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); - - // since we transferred all remote state to a local directory, we can use the same code as for - // local recovery. - localKeyedStateHandle = new IncrementalLocalKeyedStateHandle( - restoreStateHandle.getBackendIdentifier(), - restoreStateHandle.getCheckpointId(), - new DirectoryStateHandle(temporaryRestoreInstancePath), - restoreStateHandle.getKeyGroupRange(), - restoreStateHandle.getMetaStateHandle(), - restoreStateHandle.getSharedState().keySet()); - } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { - - // Recovery from local incremental state. - localKeyedStateHandle = (IncrementalLocalKeyedStateHandle) rawStateHandle; - stateMetaInfoSnapshots = readMetaData(localKeyedStateHandle.getMetaDataState()); - columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); - } else { - throw new IllegalStateException("Unexpected state handle type, " + - "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + - ", but found " + rawStateHandle.getClass()); - } - - restoreLocalStateIntoFullInstance( - localKeyedStateHandle, - columnFamilyDescriptors, - stateMetaInfoSnapshots); - } finally { - FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); - if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { - restoreFileSystem.delete(temporaryRestoreInstancePath, true); - } - } - } - - /** - * Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary - * RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the - * real restore instance and then the temporary instance is discarded. - */ - void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { - - this.restoredBackendUID = UUID.randomUUID(); - - initTargetDB(restoreStateHandles, stateBackend.keyGroupRange); - - byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; - RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getStartKeyGroup(), startKeyGroupPrefixBytes); - - byte[] stopKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; - RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); - - for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - - if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected " + IncrementalKeyedStateHandle.class + - ", but found " + rawStateHandle.getClass()); - } - - Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); - try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle( - (IncrementalKeyedStateHandle) rawStateHandle, - temporaryRestoreInstancePath); - RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) { - - List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors; - List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; - - // iterating only the requested descriptors automatically skips the default column family handle - for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) { - ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i); - ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i); - - ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle( - tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)); - - try (RocksIteratorWrapper iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { - - iterator.seek(startKeyGroupPrefixBytes); - - while (iterator.isValid()) { - - if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) { - writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); - } else { - // Since the iterator will visit the record according to the sorted order, - // we can just break here. - break; - } - - iterator.next(); - } - } // releases native iterator resources - } - } finally { - FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); - if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { - restoreFileSystem.delete(temporaryRestoreInstancePath, true); - } - } - } - } - - private class RestoredDBInstance implements AutoCloseable { Review comment: I see, but here we have deleted the class here and moved into `RocksDBIncrementalRestoreOperation`, so I guess you meant adding some javadoc in `RocksDBIncrementalRestoreOperation`? Thanks. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services