Copilot commented on code in PR #25309:
URL: https://github.com/apache/pulsar/pull/25309#discussion_r2916823853


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.common.Attributes;
+
+public class ProducerMetrics {
+
+    private final LatencyHistogram sendLatencyHistogram;
+    private final LatencyHistogram rpcLatencyHistogram;
+    private final Counter publishedBytesCounter;
+    private final UpDownCounter pendingMessagesUpDownCounter;
+    private final UpDownCounter pendingBytesUpDownCounter;
+    private final Counter producersOpenedCounter;
+    private final Counter producersClosedCounter;
+
+    public ProducerMetrics(InstrumentProvider ip, String topic) {
+        sendLatencyHistogram = ip.newLatencyHistogram(
+                "pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes 
client batching time",
+                topic, Attributes.empty());
+
+        rpcLatencyHistogram = ip.newLatencyHistogram(
+                "pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack",
+                topic, Attributes.empty());
+
+        publishedBytesCounter = ip.newCounter(
+                "pulsar.client.producer.message.send.size",
+                Unit.Bytes, "The number of bytes published",
+                topic, Attributes.empty());
+
+        pendingMessagesUpDownCounter = ip.newUpDownCounter(
+                "pulsar.client.producer.message.pending.count", Unit.Messages,
+                "The number of messages in the producer internal send queue, 
waiting to be sent",
+                topic, Attributes.empty());
+
+        pendingBytesUpDownCounter = ip.newUpDownCounter(
+                "pulsar.client.producer.message.pending.size", Unit.Bytes,
+                "The size of the messages in the producer internal queue, 
waiting to sent",

Review Comment:
   Typo/grammar in the metric description: "waiting to sent" should be "waiting 
to be sent".
   ```suggestion
                   "The size of the messages in the producer internal queue, 
waiting to be sent",
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -457,22 +431,19 @@ private void onSendComplete(Throwable e, SendCallback 
sendCallback, MessageImpl<
             long createdAt = (sendCallback instanceof 
ProducerImpl.DefaultSendMessageCallback)
                     ? ((DefaultSendMessageCallback) sendCallback).createdAt : 
this.createdAt;
             long latencyNanos = System.nanoTime() - createdAt;
-            pendingMessagesUpDownCounter.decrement();
-            pendingBytesUpDownCounter.subtract(msgSize);
             ByteBuf payload = msg.getDataBuffer();
             if (payload == null) {
                 log.error("[{}] [{}] Payload is null when calling 
onSendComplete, which is not expected.",
                         topic, producerName);
             }
             try {
                 if (e != null) {
-                    latencyHistogram.recordFailure(latencyNanos);
+                    producerMetrics.recordSendFailed(latencyNanos, msgSize);
                     stats.incrementSendFailed();
                     onSendAcknowledgement(msg, null, e);
                     sendCallback.getFuture().completeExceptionally(e);

Review Comment:
   In `onSendComplete`, the metrics are recorded using the `msgSize` field from 
the callback instance that received `sendComplete()`, not the size associated 
with `sendCallback`/`msg` currently being processed in the loop. For batched 
sends where callbacks are chained, this can mis-account pending bytes and 
published bytes (and any size-based metrics) for all but the first message. 
Consider deriving the size per message (e.g., from the current `sendCallback` 
when it’s a `DefaultSendMessageCallback`, or from `msg`) before calling 
`producerMetrics.recordSendFailed/recordSendSuccess`.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.common.Attributes;
+
+public class ProducerMetrics {
+
+    private final LatencyHistogram sendLatencyHistogram;
+    private final LatencyHistogram rpcLatencyHistogram;
+    private final Counter publishedBytesCounter;
+    private final UpDownCounter pendingMessagesUpDownCounter;
+    private final UpDownCounter pendingBytesUpDownCounter;
+    private final Counter producersOpenedCounter;
+    private final Counter producersClosedCounter;
+
+    public ProducerMetrics(InstrumentProvider ip, String topic) {
+        sendLatencyHistogram = ip.newLatencyHistogram(
+                "pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes 
client batching time",
+                topic, Attributes.empty());
+
+        rpcLatencyHistogram = ip.newLatencyHistogram(
+                "pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack",

Review Comment:
   The RPC latency histogram description reads awkwardly ("when sending data to 
receiving an ack"). Consider rephrasing to something like "when sending data 
and receiving an ack" for clarity.
   ```suggestion
                   "Publish RPC latency experienced internally by the client 
when sending data and receiving an ack",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to