exceptionfactory commented on a change in pull request #5412: URL: https://github.com/apache/nifi/pull/5412#discussion_r719488902
########## File path: nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/retrieval/CachingDataflowRetrieval.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.stateless.retrieval; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.stateless.ExecuteStateless; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; + +import java.io.File; +import java.io.IOException; + +public class CachingDataflowRetrieval implements DataflowRetrieval { + private final String processorId; + private final ComponentLog logger; + private final DataflowRetrieval delegate; + private final ObjectMapper objectMapper; + + + public CachingDataflowRetrieval(final String processorId, final ComponentLog logger, final DataflowRetrieval delegate) { + this.processorId = processorId; + this.logger = logger; + this.delegate = delegate; + + objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory())); + } + + @Override + public VersionedFlowSnapshot retrieveDataflowContents(final ProcessContext context) throws IOException { + try { + final VersionedFlowSnapshot retrieved = delegate.retrieveDataflowContents(context); + cacheFlowSnapshot(context, retrieved); + return retrieved; + } catch (final Exception e) { + final File cacheFile = getFlowCacheFile(context, processorId); + if (cacheFile.exists()) { + logger.warn("Failed to retrieve Flow Snapshot from Registry. Will restore Flow Snapshot from cached version at {}", cacheFile.getAbsolutePath(), e); Review comment: This message implies retrieval from Registry, but this caching class does not appear specific to Registry. ########## File path: nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/retrieval/CachingDataflowRetrieval.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.stateless.retrieval; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.stateless.ExecuteStateless; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; + +import java.io.File; +import java.io.IOException; + +public class CachingDataflowRetrieval implements DataflowRetrieval { + private final String processorId; + private final ComponentLog logger; + private final DataflowRetrieval delegate; + private final ObjectMapper objectMapper; + + + public CachingDataflowRetrieval(final String processorId, final ComponentLog logger, final DataflowRetrieval delegate) { + this.processorId = processorId; + this.logger = logger; + this.delegate = delegate; + + objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory())); + } + + @Override + public VersionedFlowSnapshot retrieveDataflowContents(final ProcessContext context) throws IOException { Review comment: Are there potential concurrency concerns with this method? Could multiple threads running `ExecuteStateless` call this method causing multiple attempts to write the cache file? ########## File path: nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/retrieval/CachingDataflowRetrieval.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.stateless.retrieval; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.stateless.ExecuteStateless; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; + +import java.io.File; +import java.io.IOException; + +public class CachingDataflowRetrieval implements DataflowRetrieval { + private final String processorId; + private final ComponentLog logger; + private final DataflowRetrieval delegate; + private final ObjectMapper objectMapper; + + + public CachingDataflowRetrieval(final String processorId, final ComponentLog logger, final DataflowRetrieval delegate) { + this.processorId = processorId; + this.logger = logger; + this.delegate = delegate; + + objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory())); + } + + @Override + public VersionedFlowSnapshot retrieveDataflowContents(final ProcessContext context) throws IOException { + try { + final VersionedFlowSnapshot retrieved = delegate.retrieveDataflowContents(context); + cacheFlowSnapshot(context, retrieved); + return retrieved; + } catch (final Exception e) { + final File cacheFile = getFlowCacheFile(context, processorId); + if (cacheFile.exists()) { + logger.warn("Failed to retrieve Flow Snapshot from Registry. Will restore Flow Snapshot from cached version at {}", cacheFile.getAbsolutePath(), e); + return readCachedFlow(cacheFile); + } + + throw new IOException("Failed to retrieve Flow Snapshot from configured endpoint and no cached version is available", e); + } + } + + private void cacheFlowSnapshot(final ProcessContext context, final VersionedFlowSnapshot flowSnapshot) { + final File cacheFile = getFlowCacheFile(context, processorId); + if (!cacheFile.getParentFile().exists() && !cacheFile.getParentFile().mkdirs()) { + logger.warn("Fetched dataflow from Registry but cannot create directory {} in order to cache the dataflow. " + Review comment: Similar question regarding mention of Registry retrieval. ########## File path: nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/retrieval/DataflowRetrieval.java ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.stateless.retrieval; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; + +import java.io.IOException; + +public interface DataflowRetrieval { Review comment: What do you think about naming this something like `DataflowProvider` or `DataflowRetriever`? ########## File path: nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/retrieval/CachingDataflowRetrieval.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.stateless.retrieval; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.stateless.ExecuteStateless; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; + +import java.io.File; +import java.io.IOException; + Review comment: A brief summary of the general caching approach would be helpful. It appears that a `VersionedFlowSnapshot` will be retrieved from the delegate and cached on every invocation, but the cache is only used on failures? ########## File path: nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/retrieval/FileSystemDataflowRetrieval.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.stateless.retrieval; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.stateless.ExecuteStateless; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; + +import java.io.IOException; +import java.io.InputStream; + +public class FileSystemDataflowRetrieval implements DataflowRetrieval { + @Override + public VersionedFlowSnapshot retrieveDataflowContents(final ProcessContext context) throws IOException { + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); Review comment: The instantiation of `ObjectMapper` in the caching implementation includes the `JaxbAnnotationIntrospector`, should that be included here? Perhaps instantiation should be moved to a shared location. ########## File path: nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor-tests/pom.xml ########## @@ -0,0 +1,174 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>nifi-stateless-processor-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>1.15.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-stateless-processor-tests</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>1.15.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-api</artifactId> + <version>1.15.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-server-api</artifactId> + <version>1.15.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-runtime</artifactId> + <version>1.15.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-utils</artifactId> + <version>1.15.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + + + + Review comment: Looks like some unnecessary newlines. Given the direct use of `ObjectMapper` in multiple places, recommend adding `jackson-databind` as an explicit dependency as opposed to depending on it transitively through `nifi-registry-client`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
