[
https://issues.apache.org/jira/browse/BEAM-6597?focusedWorklogId=425940&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-425940
]
ASF GitHub Bot logged work on BEAM-6597:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Apr/20 23:40
Start Date: 21/Apr/20 23:40
Worklog Time Spent: 10m
Work Description: ajamato commented on a change in pull request #11487:
URL: https://github.com/apache/beam/pull/11487#discussion_r412564549
##########
File path:
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
##########
@@ -630,72 +644,100 @@ public void process(ProcessContext ctxt) {
(Coder<WindowedValue<?>>) remoteOutputCoder.getValue(),
outputContents::add));
}
- Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+ final String testPTransformId = "create/ParMultiDo(Metrics)";
+ BundleProgressHandler progressHandler =
+ new BundleProgressHandler() {
+ @Override
+ public void onProgress(ProcessBundleProgressResponse response) {
+ MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid).countDown();
+ List<Matcher<MonitoringInfo>> matchers = new ArrayList<>();
- StateRequestHandler stateRequestHandler =
- StateRequestHandlers.forSideInputHandlerFactory(
- descriptor.getSideInputSpecs(),
- new SideInputHandlerFactory() {
- @Override
- public <V, W extends BoundedWindow>
- IterableSideInputHandler<V, W> forIterableSideInput(
- String pTransformId,
- String sideInputId,
- Coder<V> elementCoder,
- Coder<W> windowCoder) {
- throw new UnsupportedOperationException();
- }
+ // We expect all user counters except for the ones in @FinishBundle
+ // Since non-user metrics are registered at bundle creation time,
they will still report
+ // values most of which will be 0.
- @Override
- public <K, V, W extends BoundedWindow>
- MultimapSideInputHandler<K, V, W> forMultimapSideInput(
- String pTransformId,
- String sideInputId,
- KvCoder<K, V> elementCoder,
- Coder<W> windowCoder) {
- return new MultimapSideInputHandler<K, V, W>() {
- @Override
- public Iterable<V> get(BoundedWindow window) {
- return null;
- }
+ SimpleMonitoringInfoBuilder builder = new
SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.PROCESS_USER_COUNTER_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64SumValue(1);
+
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
- @Override
- public Coder<K> keyCoder() {
- return elementCoder.getKeyCoder();
- }
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.START_USER_COUNTER_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64SumValue(10);
+
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
- @Override
- public Coder<V> valueCoder() {
- return elementCoder.getValueCoder();
- }
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.FINISH_USER_COUNTER_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+
matchers.add(not(MonitoringInfoMatchers.matchSetFields(builder.build())));
- @Override
- public Iterable<V> get(K key, W window) {
- return (Iterable) sideInputData;
- }
- };
- }
- });
+ // User Distributions.
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
+ MetricsDoFn.PROCESS_USER_DISTRIBUTION_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64DistributionValue(DistributionData.create(1, 1, 1,
1));
+
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
- String testPTransformId = "create/ParMultiDo(Anonymous)";
- BundleProgressHandler progressHandler =
- new BundleProgressHandler() {
- @Override
- public void onProgress(ProcessBundleProgressResponse progress) {}
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.START_USER_DISTRIBUTION_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64DistributionValue(DistributionData.create(10, 1,
10, 10));
Review comment:
Are these values likely to easily change? You could consider writing new
matchers in MonitoringInfoMatchers to match that the values are non 0 or
something instead to make it easier to maintain.
Or write matchers that just verify a few fields, unless you really want to
verify everything is set on every MonitoringInfo. Might also make it simpler to
maintain. Unless you think they are all relevant. Up to your discretion here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 425940)
Time Spent: 0.5h (was: 20m)
> Put MonitoringInfos/metrics in the Java SDK ProcessBundleProgressResponse
> -------------------------------------------------------------------------
>
> Key: BEAM-6597
> URL: https://issues.apache.org/jira/browse/BEAM-6597
> Project: Beam
> Issue Type: New Feature
> Components: java-fn-execution
> Reporter: Alex Amato
> Assignee: Luke Cwik
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> I think this is the correct approach, as I don't believe there is any hook in
> the Java SDK yet for ProcessBundleProgressResponses.
> (1) Implement ProcessBundleProgressResponse
> See FnHarness.main to add a handle for RequestCase.PROGRESS_BUNDLE.
> (2) Refactor ProgressBundleHandler so that the metrics can be extracted from
> the MetricContainerStep map and SimpleExecutionStates for the instrucitonId
> when the call comes in. (Right now all these objects only live in the local
> function, they may need to live in an object instead which can be accessed by
> both process bundle and progress bundle responses). Be careful to not
> introduce thread contention. Ideally we need a way to read the values without
> locking new ones from being written.
> (Test 1) Also be sure to simplify RemoteExecutionTest.testMetrics().
> By inspecting the metric progress, we can remove the sleeps from this code.
> Currently there are sleeps in start, process and finish to ensure execution
> time metrics are added. Instead, once progress bundle responses are
> introduced, the metrics can be examined here
--
This message was sent by Atlassian Jira
(v8.3.4#803005)