theBestAndrew commented on a change in pull request #5418:
URL: https://github.com/apache/nifi/pull/5418#discussion_r718448390



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
##########
@@ -2603,6 +2603,205 @@ public void 
testFullRollbackAFterCheckpointDoesNotStoreState() throws IOExceptio
         stateManager.assertStateNotSet();
     }
 
+    @Test
+    public void testCloneThenRollbackCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+    }
+
+    @Test
+    public void testCloneThenWriteThenRollbackCountsClaimReferencesProperly() 
throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.write(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenAppendThenRollbackCountsClaimReferencesProperly() 
throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.append(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenWriteCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+
+        // Expect claimant count of 2 because the clone() means that the new 
FlowFile points to the same content claim.
+        assertEquals(2, contentRepo.getClaimantCount(originalClaim));
+
+        // Should be able to write to the FlowFile any number of times, and 
each time it should leave us with a Content Claim Claimant Count of 1 for the 
original (because the new FlowFile will no
+        // longer point at the original claim) and 1 for the new Content Claim.
+        for (int i=0; i < 10; i++) {
+            final ContentClaim previousCloneClaim = getContentClaim(clone);
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            // After modifying the content of the FlowFile, the claimant count 
of the 'old' content claim should be 1, as should the claimant count of the 
updated content claim.
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+            assertEquals(1, contentRepo.getClaimantCount(previousCloneClaim));
+        }
+    }
+
+    private ContentClaim getContentClaim(final FlowFile flowFile) {
+        return ((FlowFileRecord) flowFile).getContentClaim();
+    }
+
+    @Test
+    public void testCreateChildThenWriteCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+        final ContentClaim updatedCloneClaim = getContentClaim(clone);
+        assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+    }
+
+    @Test
+    public void 
testCreateChildThenMultipleWriteCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        for (int i=0; i < 100; i++) {
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(claim));
+        }
+    }
+
+    @Test
+    public void 
testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimReferencesProperly()
 {
+        FlowFile clone = session.create();

Review comment:
       My understanding is that this is a new FlowFile, not a clone of a 
previous one, so should the variable name be changed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to