tpalfy commented on a change in pull request #5738:
URL: https://github.com/apache/nifi/pull/5738#discussion_r798870414
##########
File path:
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
##########
@@ -18,26 +18,44 @@
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.rules.Action;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MockRulesEngineService extends AbstractConfigurableComponent
implements RulesEngineService {
Review comment:
Simplified tests deserve simplified helper classes:
```java
public class MockRulesEngineService extends AbstractConfigurableComponent
implements RulesEngineService {
@Override
public List<Action> fireRules(Map<String, Object> facts) {
return Collections.singletonList(Mockito.mock(Action.class));
}
@Override
public void initialize(ControllerServiceInitializationContext context) {
}
@Override
public String getIdentifier() {
return "MockRulesEngineService";
}
}
```
##########
File path:
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
##########
@@ -73,4 +73,10 @@ public void
initialize(ControllerServiceInitializationContext context) throws In
public String getIdentifier() {
return "MockPropertyContextActionHandler";
}
+
+ public void reset() {
Review comment:
The more simplified classes the better:
```java
public class MockPropertyContextActionHandler extends
AbstractConfigurableComponent implements PropertyContextActionHandler{
private List<Map<String, Object>> rows = new ArrayList<>();
private List<PropertyContext> propertyContexts = new ArrayList<>();
@Override
public void execute(PropertyContext context, Action action, Map<String,
Object> facts) {
propertyContexts.add(context);
execute(action, facts);
}
@Override
public void execute(Action action, Map<String, Object> facts) {
rows.add(facts);
}
@Override
public void initialize(ControllerServiceInitializationContext context)
throws InitializationException {
}
public List<Map<String, Object>> getRows() {
return rows;
}
public List<PropertyContext> getPropertyContexts() {
return propertyContexts;
}
@Override
public String getIdentifier() {
return "MockPropertyContextActionHandler";
}
public void reset() {
rows.clear();
propertyContexts.clear();
}
}
```
##########
File path:
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
##########
@@ -77,17 +83,37 @@ public void setup(final ConfigurationContext context)
throws IOException {
@Override
public void onTrigger(ReportingContext context) {
+ String sql =
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
Review comment:
The duplication of the sql string start time and end time evaluation
logic could be prevented.
I suggest a trait-like interface to achieve maximum potential like this:
```java
public class MetricsEventReportingTask extends AbstractReportingTask
implements QueryTimeAware {
private List<PropertyDescriptor> properties;
private MetricsQueryService metricsQueryService;
private volatile RulesEngineService rulesEngineService;
private volatile PropertyContextActionHandler actionHandler;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected void init(final ReportingInitializationContext config) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QueryMetricsUtil.QUERY);
properties.add(QueryMetricsUtil.RULES_ENGINE);
properties.add(QueryMetricsUtil.ACTION_HANDLER);
properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION);
properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE);
this.properties = Collections.unmodifiableList(properties);
}
@OnScheduled
public void setup(final ConfigurationContext context) {
actionHandler =
context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
rulesEngineService =
context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
final Integer defaultPrecision =
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
final Integer defaultScale =
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
metricsQueryService = new MetricsSqlQueryService(getLogger(),
defaultPrecision, defaultScale);
}
@Override
public void onTrigger(ReportingContext context) {
String sql =
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
try {
sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME,
BULLETIN_END_TIME);
sql = processStartAndEndTimes(context, sql,
PROVENANCE_START_TIME, PROVENANCE_END_TIME);
fireRules(context, actionHandler, rulesEngineService, sql);
} catch (Exception e) {
getLogger().error("Error opening loading rules: {}", new
Object[]{e.getMessage()}, e);
}
}
private void fireRules(ReportingContext context,
PropertyContextActionHandler actionHandler, RulesEngineService engine, String
query) throws Exception {
getLogger().debug("Executing query: {}", query);
QueryResult queryResult = metricsQueryService.query(context, query);
ResultSetRecordSet recordSet =
metricsQueryService.getResultSetRecordSet(queryResult);
Record record;
try {
while ((record = recordSet.next()) != null) {
final Map<String, Object> facts = new HashMap<>();
for (String fieldName : record.getRawFieldNames()) {
facts.put(fieldName, record.getValue(fieldName));
}
List<Action> actions = engine.fireRules(facts);
if (actions == null || actions.isEmpty()) {
getLogger().debug("No actions required for provided
facts.");
} else {
actions.forEach(action -> actionHandler.execute(context,
action, facts));
}
}
} finally {
metricsQueryService.closeQuietly(recordSet);
}
}
}
public class QueryNiFiReportingTask extends AbstractReportingTask implements
QueryTimeAware {
private List<PropertyDescriptor> properties;
private volatile RecordSinkService recordSinkService;
private MetricsQueryService metricsQueryService;
@Override
protected void init(final ReportingInitializationContext config) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QueryMetricsUtil.QUERY);
properties.add(QueryMetricsUtil.RECORD_SINK);
properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS);
properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION);
properties.add(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE);
this.properties = Collections.unmodifiableList(properties);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnScheduled
public void setup(final ConfigurationContext context) {
recordSinkService =
context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
recordSinkService.reset();
final Integer defaultPrecision =
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
final Integer defaultScale =
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
metricsQueryService = new MetricsSqlQueryService(getLogger(),
defaultPrecision, defaultScale);
}
@Override
public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true);
String sql = context.getProperty(QueryMetricsUtil.QUERY).getValue();
try {
sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME,
BULLETIN_END_TIME);
sql = processStartAndEndTimes(context, sql,
PROVENANCE_START_TIME, PROVENANCE_END_TIME);
getLogger().debug("Executing query: {}", sql);
final QueryResult queryResult =
metricsQueryService.query(context, sql);
final ResultSetRecordSet recordSet;
try {
recordSet =
metricsQueryService.getResultSetRecordSet(queryResult);
} catch (final Exception e) {
getLogger().error("Error creating record set from query
results due to {}", new Object[]{e.getMessage()}, e);
return;
}
try {
final Map<String, String> attributes = new HashMap<>();
final String transactionId = UUID.randomUUID().toString();
attributes.put("reporting.task.transaction.id",
transactionId);
attributes.put("reporting.task.name", getName());
attributes.put("reporting.task.uuid", getIdentifier());
attributes.put("reporting.task.type",
this.getClass().getSimpleName());
recordSinkService.sendData(recordSet, attributes,
context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
} catch (Exception e) {
getLogger().error("Error during transmission of query
results due to {}", new Object[]{e.getMessage()}, e);
return;
} finally {
metricsQueryService.closeQuietly(queryResult);
}
final long elapsedMillis =
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
getLogger().debug("Successfully queried and sent in {} millis",
elapsedMillis);
} catch (Exception e) {
getLogger().error("Error processing the query due to {}", new
Object[]{e.getMessage()}, e);
}
}
}
public interface QueryTimeAware {
default String processStartAndEndTimes(
ReportingContext context,
String sql,
TrackedQueryTime queryStartTime,
TrackedQueryTime queryEndTime
) throws IOException {
StateManager stateManager = context.getStateManager();
final Map<String, String> stateMap = new
HashMap<>(stateManager.getState(Scope.LOCAL).toMap());
if (sql.contains(queryStartTime.getSqlPlaceholder()) &&
sql.contains(queryEndTime.getSqlPlaceholder())) {
final long startTime = stateMap.get(queryStartTime.name()) ==
null ? 0 : Long.parseLong(stateMap.get(queryStartTime.name()));
final long currentTime = getCurrentTime();
sql = sql.replace(queryStartTime.getSqlPlaceholder(),
String.valueOf(startTime));
sql = sql.replace(queryEndTime.getSqlPlaceholder(),
String.valueOf(currentTime));
stateMap.put(queryStartTime.name(), String.valueOf(currentTime));
stateManager.setState(stateMap, Scope.LOCAL);
}
return sql;
}
default long getCurrentTime() {
return Instant.now().toEpochMilli();
}
}
```
##########
File path:
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
##########
@@ -49,27 +61,40 @@
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class TestMetricsEventReportingTask {
Review comment:
This test is already a bit more complicated than it needs to be, we can
make it simpler like this:
```java
class TestMetricsEventReportingTask {
private static final String LOG = "LOG";
private static final String ALERT = "ALERT";
private ReportingContext context;
private MockMetricsEventReportingTask reportingTask;
private MockPropertyContextActionHandler actionHandler;
private ProcessGroupStatus status;
private MockQueryBulletinRepository mockBulletinRepository;
private MockProvenanceRepository mockProvenanceRepository;
private AtomicLong currentTime;
private MockStateManager mockStateManager;
@BeforeEach
public void setup() {
currentTime = new AtomicLong();
status = new ProcessGroupStatus();
actionHandler = new MockPropertyContextActionHandler();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesRead(20000L);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setId("proc");
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
ConnectionStatusPredictions connectionStatusPredictions = new
ConnectionStatusPredictions();
connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000);
connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000);
connectionStatusPredictions.setNextPredictedQueuedCount(1000000000);
connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L);
ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
root1ConnectionStatus.setId("root1");
root1ConnectionStatus.setQueuedCount(1000);
root1ConnectionStatus.setPredictions(connectionStatusPredictions);
ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
root2ConnectionStatus.setId("root2");
root2ConnectionStatus.setQueuedCount(500);
root2ConnectionStatus.setPredictions(connectionStatusPredictions);
Collection<ConnectionStatus> rootConnectionStatuses = new
ArrayList<>();
rootConnectionStatuses.add(root1ConnectionStatus);
rootConnectionStatuses.add(root2ConnectionStatus);
status.setConnectionStatus(rootConnectionStatuses);
}
@Test
void testConnectionStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select connectionId,
predictedQueuedCount, predictedTimeToBytesBackpressureMillis from
CONNECTION_STATUS_PREDICTIONS");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<PropertyContext> propertyContexts =
actionHandler.getPropertyContexts();
assertEquals(2, actionHandler.getRows().size());
assertEquals(2, propertyContexts.size());
}
@Test
void testUniqueBulletinQueryIsInTimeWindow() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <=
$bulletinEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
"WARN", "test bulletin 2", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
}
@Test
void testUniqueBulletinQueryIsOutOfTimeWindow() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <=
$bulletinEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
"WARN", "test bulletin 2", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime() - 1);
reportingTask.onTrigger(context);
assertEquals(0, actionHandler.getRows().size());
}
@Test
void testUniqueProvenanceQueryIsInTimeWindow() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select componentId from
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
MockFlowFile mockFlowFile = new MockFlowFile(2L);
ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("2")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 2")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 2")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov2);
currentTime.set(prov2.getEventTime());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
}
@Test
void testUniqueProvenanceQueryIsOutOfTimeWindow() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select componentId from
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
MockFlowFile mockFlowFile = new MockFlowFile(2L);
ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("2")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 2")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 2")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov2);
currentTime.set(prov2.getEventTime() - 1);
reportingTask.onTrigger(context);
assertEquals(0, actionHandler.getRows().size());
}
@Test
void testTimeWindowFromStateMap() throws IOException,
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and
bulletinTimestamp <= $bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
long testBulletinStartTime = 1609538145L;
long testProvenanceStartTime = 1641074145L;
final Map<String, String> stateMap = new HashMap<>();
stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(),
String.valueOf(testBulletinStartTime));
stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(),
String.valueOf(testProvenanceStartTime));
mockStateManager.setState(stateMap, Scope.LOCAL);
final long bulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long provenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(testBulletinStartTime, bulletinStartTime);
assertEquals(testProvenanceStartTime, provenanceStartTime);
final long currentTime = Instant.now().toEpochMilli();
this.currentTime.set(currentTime);
reportingTask.onTrigger(context);
final long updatedBulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long updatedProvenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(currentTime, updatedBulletinStartTime);
assertEquals(currentTime, updatedProvenanceStartTime);
}
private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException {
final ComponentLog logger = Mockito.mock(ComponentLog.class);
reportingTask = new MockMetricsEventReportingTask();
final ReportingInitializationContext initContext =
Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
reportingTask.initialize(initContext);
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor :
reportingTask.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.putAll(customProperties);
context = Mockito.mock(ReportingContext.class);
Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
mockStateManager = new MockStateManager(reportingTask);
Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0,
PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
final PropertyValue pValue =
Mockito.mock(StandardPropertyValue.class);
actionHandler = new MockPropertyContextActionHandler();
Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
final PropertyValue resValue =
Mockito.mock(StandardPropertyValue.class);
MockRulesEngineService rulesEngineService = new
MockRulesEngineService();
Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
ConfigurationContext configContext =
Mockito.mock(ConfigurationContext.class);
Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue);
Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue);
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new
MockPropertyValue("10"));
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
MockPropertyValue("0"));
reportingTask.setup(configContext);
setupMockProvenanceRepository(eventAccess);
setupMockBulletinRepository();
return reportingTask;
}
private final class MockMetricsEventReportingTask extends
MetricsEventReportingTask {
@Override
public long getCurrentTime() {
return currentTime.get();
}
}
private void setupMockBulletinRepository() {
mockBulletinRepository = new MockQueryBulletinRepository();
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
"WARN", "test bulletin 1", "testFlowFileUuid"));
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
}
private void setupMockProvenanceRepository(final EventAccess
eventAccess) {
mockProvenanceRepository = new MockProvenanceRepository();
long currentTimeMillis = System.currentTimeMillis();
Map<String, String> previousAttributes = new HashMap<>();
previousAttributes.put("mime.type", "application/json");
previousAttributes.put("test.value", "A");
Map<String, String> updatedAttributes = new
HashMap<>(previousAttributes);
updatedAttributes.put("test.value", "B");
// Generate provenance events and put them in a repository
Processor processor = mock(Processor.class);
SharedSessionState sharedState = new SharedSessionState(processor,
new AtomicLong(0));
MockProcessSession processSession = new
MockProcessSession(sharedState, processor);
MockFlowFile mockFlowFile = processSession.createFlowFile("Test
content".getBytes());
ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("1")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(currentTimeMillis)
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.setAttributes(previousAttributes, updatedAttributes)
.build();
mockProvenanceRepository.registerEvent(prov1);
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
}
private static class MockQueryBulletinRepository extends
MockBulletinRepository {
Map<String, List<Bulletin>> bulletins = new HashMap<>();
@Override
public void addBulletin(Bulletin bulletin) {
bulletins.computeIfAbsent(bulletin.getCategory(), key -> new
ArrayList<>())
.add(bulletin);
}
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
return new ArrayList<>(
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
.orElse(Collections.emptyList())
);
}
@Override
public List<Bulletin> findBulletinsForController() {
return Optional.ofNullable(bulletins.get("controller"))
.orElse(Collections.emptyList());
}
}
}
```
##########
File path:
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
##########
@@ -67,21 +72,26 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-public class TestQueryNiFiReportingTask {
+class TestQueryNiFiReportingTask {
Review comment:
This test class can also be significantly simplified:
```suggestion
class TestQueryNiFiReportingTask {
private ReportingContext context;
private MockQueryNiFiReportingTask reportingTask;
private MockRecordSinkService mockRecordSinkService;
private ProcessGroupStatus status;
private BulletinRepository mockBulletinRepository;
private MockProvenanceRepository mockProvenanceRepository;
private AtomicLong currentTime;
private MockStateManager mockStateManager;
@BeforeEach
public void setup() {
currentTime = new AtomicLong();
status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesRead(20000L);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setId("proc");
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
root1ConnectionStatus.setId("root1");
root1ConnectionStatus.setQueuedCount(1000);
root1ConnectionStatus.setBackPressureObjectThreshold(1000);
ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
root2ConnectionStatus.setId("root2");
root2ConnectionStatus.setQueuedCount(500);
root2ConnectionStatus.setBackPressureObjectThreshold(1000);
Collection<ConnectionStatus> rootConnectionStatuses = new
ArrayList<>();
rootConnectionStatuses.add(root1ConnectionStatus);
rootConnectionStatuses.add(root2ConnectionStatus);
status.setConnectionStatus(rootConnectionStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
groupStatus1.setProcessorStatus(processorStatuses);
groupStatus1.setBytesRead(1234L);
// Create a nested group status with a connection
ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
groupStatus2.setProcessorStatus(processorStatuses);
groupStatus2.setBytesRead(12345L);
ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
nestedConnectionStatus.setId("nested");
nestedConnectionStatus.setQueuedCount(1001);
Collection<ConnectionStatus> nestedConnectionStatuses = new
ArrayList<>();
nestedConnectionStatuses.add(nestedConnectionStatus);
groupStatus2.setConnectionStatus(nestedConnectionStatuses);
Collection<ProcessGroupStatus> nestedGroupStatuses = new
ArrayList<>();
nestedGroupStatuses.add(groupStatus2);
groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
groupStatus3.setBytesRead(1L);
ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
nestedConnectionStatus2.setId("nested2");
nestedConnectionStatus2.setQueuedCount(3);
Collection<ConnectionStatus> nestedConnectionStatuses2 = new
ArrayList<>();
nestedConnectionStatuses2.add(nestedConnectionStatus2);
groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
Collection<ProcessGroupStatus> nestedGroupStatuses2 = new
ArrayList<>();
nestedGroupStatuses2.add(groupStatus3);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus1);
groupStatuses.add(groupStatus3);
status.setProcessGroupStatus(groupStatuses);
}
@Test
void testConnectionStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select
id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by
queuedCount desc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(4, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(3, row.size()); // Only projected 2 columns
Object id = row.get("id");
assertTrue(id instanceof String);
assertEquals("nested", id);
assertEquals(1001, row.get("queuedCount"));
// Validate the second row
row = rows.get(1);
id = row.get("id");
assertEquals("root1", id);
assertEquals(1000, row.get("queuedCount"));
assertEquals(true, row.get("isBackPressureEnabled"));
// Validate the third row
row = rows.get(2);
id = row.get("id");
assertEquals("root2", id);
assertEquals(500, row.get("queuedCount"));
assertEquals(false, row.get("isBackPressureEnabled"));
// Validate the fourth row
row = rows.get(3);
id = row.get("id");
assertEquals("nested2", id);
assertEquals(3, row.get("queuedCount"));
}
@Test
void testBulletinIsInTimeWindow() throws InitializationException {
String query = "select * from BULLETINS where bulletinTimestamp >
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, query);
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3, rows.size());
final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(),
"ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testBulletinIsOutOfTimeWindow() throws InitializationException {
String query = "select * from BULLETINS where bulletinTimestamp >
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, query);
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3, rows.size());
final Bulletin bulletin = BulletinFactory.createBulletin("input
port", "ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime() - 1);
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(0, sameRows.size());
}
@Test
void testProvenanceEventIsInTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE
where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(prov1002.getEventTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testProvenanceEventIsOutOfTimeWindow() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE
where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(prov1002.getEventTime() - 1);
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(0, sameRows.size());
}
@Test
void testUniqueProvenanceAndBulletinQuery() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <=
$bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and timestampMillis
<= $provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3003, rows.size());
final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(),
"ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testTimeWindowFromStateMap() throws IOException,
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and
bulletinTimestamp <= $bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
long testBulletinStartTime = 1609538145L;
long testProvenanceStartTime = 1641074145L;
final Map<String, String> stateMap = new HashMap<>();
stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(),
String.valueOf(testBulletinStartTime));
stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(),
String.valueOf(testProvenanceStartTime));
mockStateManager.setState(stateMap, Scope.LOCAL);
final long bulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long provenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(testBulletinStartTime, bulletinStartTime);
assertEquals(testProvenanceStartTime, provenanceStartTime);
final long currentTime = Instant.now().toEpochMilli();
this.currentTime.set(currentTime);
reportingTask.onTrigger(context);
final long updatedBulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long updatedProvenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(currentTime, updatedBulletinStartTime);
assertEquals(currentTime, updatedProvenanceStartTime);
}
//--NEW END
@Test
void testJvmMetricsTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select "
+ Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
MetricNames.JVM_THREAD_COUNT,
MetricNames.JVM_THREAD_STATES_BLOCKED,
MetricNames.JVM_THREAD_STATES_RUNNABLE,
MetricNames.JVM_THREAD_STATES_TERMINATED,
MetricNames.JVM_THREAD_STATES_TIMED_WAITING,
MetricNames.JVM_UPTIME,
MetricNames.JVM_HEAP_USED,
MetricNames.JVM_HEAP_USAGE,
MetricNames.JVM_NON_HEAP_USAGE,
MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) ->
s.replace(".", "_")).collect(Collectors.joining(","))
+ " from JVM_METRICS");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1, rows.size());
Map<String, Object> row = rows.get(0);
assertEquals(11, row.size());
assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".",
"_")) instanceof Integer);
assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".", "_"))
instanceof Double);
}
@Test
void testProcessGroupStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from
PROCESS_GROUP_STATUS order by bytesRead asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(4, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(20, row.size());
assertEquals(1L, row.get("bytesRead"));
// Validate the second row
row = rows.get(1);
assertEquals(1234L, row.get("bytesRead"));
// Validate the third row
row = rows.get(2);
assertEquals(12345L, row.get("bytesRead"));
// Validate the fourth row
row = rows.get(3);
assertEquals(20000L, row.get("bytesRead"));
}
@Test
void testNoResults() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from
CONNECTION_STATUS where queuedCount > 2000");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(0, rows.size());
}
@Test
void testProvenanceTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE
order by eventId asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(24, row.size());
// Verify the first row contents
assertEquals(0L, row.get("eventId"));
assertEquals("CREATE", row.get("eventType"));
assertEquals(12L, row.get("entitySize"));
assertNull(row.get("contentPath"));
assertNull(row.get("previousContentPath"));
Object o = row.get("previousAttributes");
assertTrue(o instanceof Map);
Map<String, String> previousAttributes = (Map<String, String>) o;
assertEquals("A", previousAttributes.get("test.value"));
o = row.get("updatedAttributes");
assertTrue(o instanceof Map);
Map<String, String> updatedAttributes = (Map<String, String>) o;
assertEquals("B", updatedAttributes.get("test.value"));
// Verify some fields in the second row
row = rows.get(1);
assertEquals(24, row.size());
// Verify the second row contents
assertEquals(1L, row.get("eventId"));
assertEquals("DROP", row.get("eventType"));
// Verify some fields in the last row
row = rows.get(1000);
assertEquals(24, row.size());
// Verify the last row contents
assertEquals(1000L, row.get("eventId"));
assertEquals("DROP", row.get("eventType"));
}
@Test
void testBulletinTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS
order by bulletinTimestamp asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
final List<Map<String, Object>> rows =
mockRecordSinkService.getRows();
final String flowFileUuid = "testFlowFileUuid";
assertEquals(3, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(14, row.size());
assertNotNull(row.get("bulletinId"));
assertEquals("controller", row.get("bulletinCategory"));
assertEquals("WARN", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
// Validate the second row
row = rows.get(1);
assertEquals("processor", row.get("bulletinCategory"));
assertEquals("INFO", row.get("bulletinLevel"));
// Validate the third row
row = rows.get(2);
assertEquals("controller_service", row.get("bulletinCategory"));
assertEquals("ERROR", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
}
private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
reportingTask = new MockQueryNiFiReportingTask();
final ReportingInitializationContext initContext =
mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
reportingTask.initialize(initContext);
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor :
reportingTask.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.putAll(customProperties);
context = mock(ReportingContext.class);
mockStateManager = new MockStateManager(reportingTask);
Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0,
PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
final EventAccess eventAccess = mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
final PropertyValue pValue = mock(StandardPropertyValue.class);
mockRecordSinkService = new MockRecordSinkService();
Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
ConfigurationContext configContext =
mock(ConfigurationContext.class);
Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new
MockPropertyValue("10"));
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
MockPropertyValue("0"));
reportingTask.setup(configContext);
mockProvenanceRepository = new MockProvenanceRepository();
long currentTimeMillis = System.currentTimeMillis();
Map<String, String> previousAttributes = new HashMap<>();
previousAttributes.put("mime.type", "application/json");
previousAttributes.put("test.value", "A");
Map<String, String> updatedAttributes = new
HashMap<>(previousAttributes);
updatedAttributes.put("test.value", "B");
// Generate provenance events and put them in a repository
Processor processor = mock(Processor.class);
SharedSessionState sharedState = new SharedSessionState(processor,
new AtomicLong(0));
MockProcessSession processSession = new
MockProcessSession(sharedState, processor);
MockFlowFile mockFlowFile = processSession.createFlowFile("Test
content".getBytes());
ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(currentTimeMillis)
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.setAttributes(previousAttributes, updatedAttributes)
.build();
mockProvenanceRepository.registerEvent(prov1);
for (int i = 1; i < 1001; i++) {
String indexString = Integer.toString(i);
mockFlowFile = processSession.createFlowFile(("Test content " +
indexString).getBytes());
ProvenanceEventRecord prov =
mockProvenanceRepository.eventBuilder()
.fromFlowFile(mockFlowFile)
.setEventType(ProvenanceEventType.DROP)
.setComponentId(indexString)
.setComponentType("Processor")
.setFlowFileUUID("I am FlowFile " + indexString)
.setEventTime(currentTimeMillis - i)
.build();
mockProvenanceRepository.registerEvent(prov);
}
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
mockBulletinRepository = new MockQueryBulletinRepository();
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller",
"WARN", "test bulletin 2", "testFlowFileUuid"));
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
"INFO", "test bulletin 1", "testFlowFileUuid"));
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
"ERROR", "test bulletin 2", "testFlowFileUuid"));
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
return reportingTask;
}
private final class MockQueryNiFiReportingTask extends
QueryNiFiReportingTask {
@Override
public long getCurrentTime() {
return currentTime.get();
}
}
private static class MockQueryBulletinRepository extends
MockBulletinRepository {
Map<String, List<Bulletin>> bulletins = new HashMap<>();
@Override
public void addBulletin(Bulletin bulletin) {
bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new
ArrayList<>())
.add(bulletin);
}
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
return new ArrayList<>(
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
.orElse(Collections.emptyList()));
}
@Override
public List<Bulletin> findBulletinsForController() {
return Optional.ofNullable(bulletins.get("controller"))
.orElse(Collections.emptyList());
}
}
}
```
##########
File path:
nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
##########
@@ -67,21 +72,26 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-public class TestQueryNiFiReportingTask {
+class TestQueryNiFiReportingTask {
Review comment:
This test class can also be significantly simplified:
```java
class TestQueryNiFiReportingTask {
private ReportingContext context;
private MockQueryNiFiReportingTask reportingTask;
private MockRecordSinkService mockRecordSinkService;
private ProcessGroupStatus status;
private BulletinRepository mockBulletinRepository;
private MockProvenanceRepository mockProvenanceRepository;
private AtomicLong currentTime;
private MockStateManager mockStateManager;
@BeforeEach
public void setup() {
currentTime = new AtomicLong();
status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesRead(20000L);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setId("proc");
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
root1ConnectionStatus.setId("root1");
root1ConnectionStatus.setQueuedCount(1000);
root1ConnectionStatus.setBackPressureObjectThreshold(1000);
ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
root2ConnectionStatus.setId("root2");
root2ConnectionStatus.setQueuedCount(500);
root2ConnectionStatus.setBackPressureObjectThreshold(1000);
Collection<ConnectionStatus> rootConnectionStatuses = new
ArrayList<>();
rootConnectionStatuses.add(root1ConnectionStatus);
rootConnectionStatuses.add(root2ConnectionStatus);
status.setConnectionStatus(rootConnectionStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
groupStatus1.setProcessorStatus(processorStatuses);
groupStatus1.setBytesRead(1234L);
// Create a nested group status with a connection
ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
groupStatus2.setProcessorStatus(processorStatuses);
groupStatus2.setBytesRead(12345L);
ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
nestedConnectionStatus.setId("nested");
nestedConnectionStatus.setQueuedCount(1001);
Collection<ConnectionStatus> nestedConnectionStatuses = new
ArrayList<>();
nestedConnectionStatuses.add(nestedConnectionStatus);
groupStatus2.setConnectionStatus(nestedConnectionStatuses);
Collection<ProcessGroupStatus> nestedGroupStatuses = new
ArrayList<>();
nestedGroupStatuses.add(groupStatus2);
groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
groupStatus3.setBytesRead(1L);
ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
nestedConnectionStatus2.setId("nested2");
nestedConnectionStatus2.setQueuedCount(3);
Collection<ConnectionStatus> nestedConnectionStatuses2 = new
ArrayList<>();
nestedConnectionStatuses2.add(nestedConnectionStatus2);
groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
Collection<ProcessGroupStatus> nestedGroupStatuses2 = new
ArrayList<>();
nestedGroupStatuses2.add(groupStatus3);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus1);
groupStatuses.add(groupStatus3);
status.setProcessGroupStatus(groupStatuses);
}
@Test
void testConnectionStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select
id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by
queuedCount desc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(4, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(3, row.size()); // Only projected 2 columns
Object id = row.get("id");
assertTrue(id instanceof String);
assertEquals("nested", id);
assertEquals(1001, row.get("queuedCount"));
// Validate the second row
row = rows.get(1);
id = row.get("id");
assertEquals("root1", id);
assertEquals(1000, row.get("queuedCount"));
assertEquals(true, row.get("isBackPressureEnabled"));
// Validate the third row
row = rows.get(2);
id = row.get("id");
assertEquals("root2", id);
assertEquals(500, row.get("queuedCount"));
assertEquals(false, row.get("isBackPressureEnabled"));
// Validate the fourth row
row = rows.get(3);
id = row.get("id");
assertEquals("nested2", id);
assertEquals(3, row.get("queuedCount"));
}
@Test
void testBulletinIsInTimeWindow() throws InitializationException {
String query = "select * from BULLETINS where bulletinTimestamp >
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, query);
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3, rows.size());
final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(),
"ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testBulletinIsOutOfTimeWindow() throws InitializationException {
String query = "select * from BULLETINS where bulletinTimestamp >
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, query);
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3, rows.size());
final Bulletin bulletin = BulletinFactory.createBulletin("input
port", "ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime() - 1);
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(0, sameRows.size());
}
@Test
void testProvenanceEventIsInTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE
where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(prov1002.getEventTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testProvenanceEventIsOutOfTimeWindow() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE
where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(prov1002.getEventTime() - 1);
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(0, sameRows.size());
}
@Test
void testUniqueProvenanceAndBulletinQuery() throws
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <=
$bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and timestampMillis
<= $provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3003, rows.size());
final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(),
"ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testTimeWindowFromStateMap() throws IOException,
InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and
bulletinTimestamp <= $bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
long testBulletinStartTime = 1609538145L;
long testProvenanceStartTime = 1641074145L;
final Map<String, String> stateMap = new HashMap<>();
stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(),
String.valueOf(testBulletinStartTime));
stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(),
String.valueOf(testProvenanceStartTime));
mockStateManager.setState(stateMap, Scope.LOCAL);
final long bulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long provenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(testBulletinStartTime, bulletinStartTime);
assertEquals(testProvenanceStartTime, provenanceStartTime);
final long currentTime = Instant.now().toEpochMilli();
this.currentTime.set(currentTime);
reportingTask.onTrigger(context);
final long updatedBulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long updatedProvenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(currentTime, updatedBulletinStartTime);
assertEquals(currentTime, updatedProvenanceStartTime);
}
//--NEW END
@Test
void testJvmMetricsTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select "
+ Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
MetricNames.JVM_THREAD_COUNT,
MetricNames.JVM_THREAD_STATES_BLOCKED,
MetricNames.JVM_THREAD_STATES_RUNNABLE,
MetricNames.JVM_THREAD_STATES_TERMINATED,
MetricNames.JVM_THREAD_STATES_TIMED_WAITING,
MetricNames.JVM_UPTIME,
MetricNames.JVM_HEAP_USED,
MetricNames.JVM_HEAP_USAGE,
MetricNames.JVM_NON_HEAP_USAGE,
MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) ->
s.replace(".", "_")).collect(Collectors.joining(","))
+ " from JVM_METRICS");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1, rows.size());
Map<String, Object> row = rows.get(0);
assertEquals(11, row.size());
assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".",
"_")) instanceof Integer);
assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".", "_"))
instanceof Double);
}
@Test
void testProcessGroupStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from
PROCESS_GROUP_STATUS order by bytesRead asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(4, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(20, row.size());
assertEquals(1L, row.get("bytesRead"));
// Validate the second row
row = rows.get(1);
assertEquals(1234L, row.get("bytesRead"));
// Validate the third row
row = rows.get(2);
assertEquals(12345L, row.get("bytesRead"));
// Validate the fourth row
row = rows.get(3);
assertEquals(20000L, row.get("bytesRead"));
}
@Test
void testNoResults() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from
CONNECTION_STATUS where queuedCount > 2000");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(0, rows.size());
}
@Test
void testProvenanceTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE
order by eventId asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(24, row.size());
// Verify the first row contents
assertEquals(0L, row.get("eventId"));
assertEquals("CREATE", row.get("eventType"));
assertEquals(12L, row.get("entitySize"));
assertNull(row.get("contentPath"));
assertNull(row.get("previousContentPath"));
Object o = row.get("previousAttributes");
assertTrue(o instanceof Map);
Map<String, String> previousAttributes = (Map<String, String>) o;
assertEquals("A", previousAttributes.get("test.value"));
o = row.get("updatedAttributes");
assertTrue(o instanceof Map);
Map<String, String> updatedAttributes = (Map<String, String>) o;
assertEquals("B", updatedAttributes.get("test.value"));
// Verify some fields in the second row
row = rows.get(1);
assertEquals(24, row.size());
// Verify the second row contents
assertEquals(1L, row.get("eventId"));
assertEquals("DROP", row.get("eventType"));
// Verify some fields in the last row
row = rows.get(1000);
assertEquals(24, row.size());
// Verify the last row contents
assertEquals(1000L, row.get("eventId"));
assertEquals("DROP", row.get("eventType"));
}
@Test
void testBulletinTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS
order by bulletinTimestamp asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
final List<Map<String, Object>> rows =
mockRecordSinkService.getRows();
final String flowFileUuid = "testFlowFileUuid";
assertEquals(3, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(14, row.size());
assertNotNull(row.get("bulletinId"));
assertEquals("controller", row.get("bulletinCategory"));
assertEquals("WARN", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
// Validate the second row
row = rows.get(1);
assertEquals("processor", row.get("bulletinCategory"));
assertEquals("INFO", row.get("bulletinLevel"));
// Validate the third row
row = rows.get(2);
assertEquals("controller_service", row.get("bulletinCategory"));
assertEquals("ERROR", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
}
private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
reportingTask = new MockQueryNiFiReportingTask();
final ReportingInitializationContext initContext =
mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
reportingTask.initialize(initContext);
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor :
reportingTask.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.putAll(customProperties);
context = mock(ReportingContext.class);
mockStateManager = new MockStateManager(reportingTask);
Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0,
PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
final EventAccess eventAccess = mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
final PropertyValue pValue = mock(StandardPropertyValue.class);
mockRecordSinkService = new MockRecordSinkService();
Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
ConfigurationContext configContext =
mock(ConfigurationContext.class);
Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION)).thenReturn(new
MockPropertyValue("10"));
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
MockPropertyValue("0"));
reportingTask.setup(configContext);
mockProvenanceRepository = new MockProvenanceRepository();
long currentTimeMillis = System.currentTimeMillis();
Map<String, String> previousAttributes = new HashMap<>();
previousAttributes.put("mime.type", "application/json");
previousAttributes.put("test.value", "A");
Map<String, String> updatedAttributes = new
HashMap<>(previousAttributes);
updatedAttributes.put("test.value", "B");
// Generate provenance events and put them in a repository
Processor processor = mock(Processor.class);
SharedSessionState sharedState = new SharedSessionState(processor,
new AtomicLong(0));
MockProcessSession processSession = new
MockProcessSession(sharedState, processor);
MockFlowFile mockFlowFile = processSession.createFlowFile("Test
content".getBytes());
ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(currentTimeMillis)
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.setAttributes(previousAttributes, updatedAttributes)
.build();
mockProvenanceRepository.registerEvent(prov1);
for (int i = 1; i < 1001; i++) {
String indexString = Integer.toString(i);
mockFlowFile = processSession.createFlowFile(("Test content " +
indexString).getBytes());
ProvenanceEventRecord prov =
mockProvenanceRepository.eventBuilder()
.fromFlowFile(mockFlowFile)
.setEventType(ProvenanceEventType.DROP)
.setComponentId(indexString)
.setComponentType("Processor")
.setFlowFileUUID("I am FlowFile " + indexString)
.setEventTime(currentTimeMillis - i)
.build();
mockProvenanceRepository.registerEvent(prov);
}
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
mockBulletinRepository = new MockQueryBulletinRepository();
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller",
"WARN", "test bulletin 2", "testFlowFileUuid"));
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
"INFO", "test bulletin 1", "testFlowFileUuid"));
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
"ERROR", "test bulletin 2", "testFlowFileUuid"));
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
return reportingTask;
}
private final class MockQueryNiFiReportingTask extends
QueryNiFiReportingTask {
@Override
public long getCurrentTime() {
return currentTime.get();
}
}
private static class MockQueryBulletinRepository extends
MockBulletinRepository {
Map<String, List<Bulletin>> bulletins = new HashMap<>();
@Override
public void addBulletin(Bulletin bulletin) {
bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new
ArrayList<>())
.add(bulletin);
}
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
return new ArrayList<>(
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
.orElse(Collections.emptyList()));
}
@Override
public List<Bulletin> findBulletinsForController() {
return Optional.ofNullable(bulletins.get("controller"))
.orElse(Collections.emptyList());
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]