[
https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628907#comment-15628907
]
ASF GitHub Bot commented on NIFI-2850:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1115#discussion_r86136287
--- Diff:
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
---
@@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext
context, final ProcessSessionFacto
}
final ProcessSession session = sessionFactory.createSession();
- FlowFile flowFile = session.get();
- if (flowFile == null) {
+ final List<FlowFile> flowFiles = session.get(1000);
+ if (flowFiles.isEmpty()) {
break;
}
- flowFile = this.preprocessFlowFile(context, session, flowFile);
-
- String groupId = this.getGroupId(context, flowFile);
-
- final boolean binned = binManager.offer(groupId, flowFile,
session);
-
- // could not be added to a bin -- probably too large by
itself, so create a separate bin for just this guy.
- if (!binned) {
- Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE,
null);
- bin.offer(flowFile, session);
- this.readyBins.add(bin);
+ final Map<String, List<FlowFile>> flowFileGroups = new
HashMap<>();
+ for (FlowFile flowFile : flowFiles) {
+ flowFile = this.preprocessFlowFile(context, session,
flowFile);
+ final String groupingIdentifier = getGroupId(context,
flowFile);
+ flowFileGroups.computeIfAbsent(groupingIdentifier, id ->
new ArrayList<>()).add(flowFile);
--- End diff --
I don't believe so. Using putIfAbsent, we would be creating a new ArrayList
every time. By using the computeIfAbsent, it allows us to create the ArrayList
only if the key is not already present.
> Provide ability for a FlowFile to be migrated from one Process Session to
> another
> ---------------------------------------------------------------------------------
>
> Key: NIFI-2850
> URL: https://issues.apache.org/jira/browse/NIFI-2850
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Oleg Zhurakousky
> Fix For: 1.1.0
>
>
> Currently, the MergeContent processor creates a separate ProcessSession for
> each FlowFile that it pulls. This is done so that we can ensure that we can
> commit all Process Sessions when a bin is full. Unfortunately, this means
> that MergeContent is required to call ProcessSession.get() many times, which
> adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be
> migrated from 1 session to another, we can have a session per bin, and then
> use ProcessSession.get(100) to greatly reduce lock contention. This will
> likely have benefits in other processors as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)