diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/LoggingUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/LoggingUtil.java new file mode 100644 index 000000000..f7243a944 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/LoggingUtil.java @@ -0,0 +1,159 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.pubsub.v1.PubsubMessage; +import java.util.logging.Level; +import java.util.logging.Logger; + +public final class LoggingUtil { + // Instantiate all loggers as static final fields to maintain strong references + + private static final Logger slowAckLogger = Logger.getLogger("slow-ack"); + private static final Logger callbackDeliveryLogger = Logger.getLogger("callback-delivery"); + private static final Logger expiryLogger = Logger.getLogger("expiry"); + private static final Logger callbackExceptionsLogger = Logger.getLogger("callback-exceptions"); + private static final Logger ackBatchLogger = Logger.getLogger("ack-batch"); + private static final Logger subscriberFlowControlLogger = + Logger.getLogger("subscriber-flow-control"); + private static final Logger ackNackLogger = Logger.getLogger("ack-nack"); + private static final Logger publishBatchLogger = Logger.getLogger("publish-batch"); + private static final Logger subscriberStreamsLogger = Logger.getLogger("subscriber-streams"); + + public enum SubSytem { + SLOW_ACK(slowAckLogger), + CALLBACK_DELIVERY(callbackDeliveryLogger), + EXPIRY(expiryLogger), + CALLBACK_EXCEPTIONS(callbackExceptionsLogger), + ACK_BATCH(ackBatchLogger), + SUBSCRIBER_FLOW_CONTROL(subscriberFlowControlLogger), + ACK_NACK(ackNackLogger), + PUBLISH_BATCH(publishBatchLogger), + SUBSCRIBER_STREAMS(subscriberStreamsLogger); + + private final Logger logger; + + SubSytem(Logger logger) { + this.logger = logger; + } + + public Logger getLogger() { + return logger; + } + } + + public LoggingUtil() {} + + private String getSubscriptionLogPrefix( + PubsubMessageWrapper messageWrapper, String ackId, boolean exactlyOnceDeliveryEnabled) { + if (messageWrapper == null || messageWrapper.getPubsubMessage() == null) { + return " Ack ID: " + + ackId + + ", Exactly Once Delivery: " + + exactlyOnceDeliveryEnabled + + " (Message details not available)"; + } + + PubsubMessage message = messageWrapper.getPubsubMessage(); + String messageId = message.getMessageId(); + String orderingKey = message.getOrderingKey(); + + StringBuilder sb = new StringBuilder(); + sb.append("Message ID: ").append(messageId); + sb.append(", Ack ID: ").append(ackId); + if (orderingKey != null && !orderingKey.isEmpty()) { + sb.append(", Ordering Key: ").append(orderingKey); + } + sb.append(", Exactly Once Delivery: ").append(exactlyOnceDeliveryEnabled); + return sb.toString(); + } + + private String getPublisherLogPrefix(PubsubMessageWrapper messageWrapper) { + if (messageWrapper == null || messageWrapper.getPubsubMessage() == null) { + return " (Message details not available)"; + } + + PubsubMessage message = messageWrapper.getPubsubMessage(); + String messageId = message.getMessageId(); + String orderingKey = message.getOrderingKey(); + + StringBuilder sb = new StringBuilder(); + sb.append("Message ID: ").append(messageId); + if (orderingKey != null && !orderingKey.isEmpty()) { + sb.append(", Ordering Key: ").append(orderingKey); + } + return sb.toString(); + } + + public void logSubscriber( + SubSytem subSystem, + Level level, + String msg, + PubsubMessageWrapper messageWrapper, + String ackId, + boolean exactlyOnceDeliveryEnabled) { + Logger logger = subSystem.getLogger(); + if (logger.isLoggable(level)) { + String prefix = getSubscriptionLogPrefix(messageWrapper, ackId, exactlyOnceDeliveryEnabled); + logger.log(level, prefix + " - " + msg); + } + } + + public void logSubscriber( + SubSytem subSystem, + Level level, + String msg, + PubsubMessageWrapper messageWrapper, + String ackId, + boolean exactlyOnceDeliveryEnabled, + Throwable throwable) { + Logger logger = subSystem.getLogger(); + if (logger.isLoggable(level)) { + String prefix = getSubscriptionLogPrefix(messageWrapper, ackId, exactlyOnceDeliveryEnabled); + logger.log(level, prefix + " - " + msg, throwable); + } + } + + public void logPublisher( + SubSytem subSystem, Level level, String msg, PubsubMessageWrapper messageWrapper) { + Logger logger = subSystem.getLogger(); + if (logger.isLoggable(level)) { + String prefix = getPublisherLogPrefix(messageWrapper); + logger.log(level, prefix + " - " + msg); + } + } + + public void logPublisher( + SubSytem subSystem, + Level level, + String msg, + PubsubMessageWrapper messageWrapper, + Throwable throwable) { + Logger logger = subSystem.getLogger(); + if (logger.isLoggable(level)) { + String prefix = getPublisherLogPrefix(messageWrapper); + logger.log(level, prefix + " - " + msg, throwable); + } + } + + public void logEvent(SubSytem subSytem, Level level, String msg, Object... params) { + Logger logger = subSytem.getLogger(); + if (logger.isLoggable(level)) { + logger.log(level, msg, params); + } + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 5ed36b9cd..67821018b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -59,6 +59,7 @@ */ class MessageDispatcher { private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName()); + private LoggingUtil loggingUtil = new LoggingUtil(); @InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9; @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); @@ -112,6 +113,8 @@ class MessageDispatcher { private final SubscriberShutdownSettings subscriberShutdownSettings; private final AtomicBoolean nackImmediatelyShutdownInProgress = new AtomicBoolean(false); + private final double slowAckPercentile = 99.0; + /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */ public enum AckReply { ACK, @@ -157,12 +160,13 @@ private void forget() { @Override public void onFailure(Throwable t) { - logger.log( + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.CALLBACK_EXCEPTIONS, Level.WARNING, - "MessageReceiver failed to process ack ID: " - + this.ackRequestData.getAckId() - + ", the message will be nacked.", - t); + "MessageReceiver exception.", + this.ackRequestData.getMessageWrapper(), + this.ackRequestData.getAckId(), + exactlyOnceDeliveryEnabled.get()); this.ackRequestData.setResponse(AckResponse.OTHER, false); pendingNacks.add(this.ackRequestData); tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack"); @@ -171,6 +175,19 @@ public void onFailure(Throwable t) { @Override public void onSuccess(AckReply reply) { + int ackLatency = + Ints.saturatedCast((long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)); + if (ackLatency >= ackLatencyDistribution.getPercentile(slowAckPercentile)) { + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.SLOW_ACK, + Level.FINE, + String.format( + "Message ack duration of %d is higher than the p99 ack duration", ackLatency), + this.ackRequestData.getMessageWrapper(), + this.ackRequestData.getAckId(), + exactlyOnceDeliveryEnabled.get()); + } + switch (reply) { case ACK: if (nackImmediatelyShutdownInProgress.get() && exactlyOnceDeliveryEnabled.get()) { @@ -180,15 +197,27 @@ public void onSuccess(AckReply reply) { } else { pendingAcks.add(this.ackRequestData); // Record the latency rounded to the next closest integer. - ackLatencyDistribution.record( - Ints.saturatedCast( - (long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D))); + ackLatencyDistribution.record(ackLatency); tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack"); } + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.ACK_NACK, + Level.FINE, + "Ack called on message.", + this.ackRequestData.getMessageWrapper(), + this.ackRequestData.getAckId(), + exactlyOnceDeliveryEnabled.get()); break; case NACK: pendingNacks.add(this.ackRequestData); tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack"); + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.ACK_NACK, + Level.FINE, + "Nack called on message.", + this.ackRequestData.getMessageWrapper(), + this.ackRequestData.getAckId(), + exactlyOnceDeliveryEnabled.get()); break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); @@ -568,10 +597,32 @@ private void processBatch(List batch) { // shutdown will block on processing of all these messages anyway. tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper()); try { + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.SUBSCRIBER_FLOW_CONTROL, + Level.FINE, + "Flow controller is blocking.", + message.messageWrapper(), + message.messageWrapper().getAckId(), + exactlyOnceDeliveryEnabled.get()); flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize()); + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.SUBSCRIBER_FLOW_CONTROL, + Level.FINE, + "Flow controller is done blocking.", + message.messageWrapper(), + message.messageWrapper().getAckId(), + exactlyOnceDeliveryEnabled.get()); tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper()); } catch (FlowControlException unexpectedException) { // This should be a blocking flow controller and never throw an exception. + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.SUBSCRIBER_FLOW_CONTROL, + Level.FINE, + "Flow controller unexpected exception.", + message.messageWrapper(), + message.messageWrapper().getAckId(), + exactlyOnceDeliveryEnabled.get(), + unexpectedException); tracer.setSubscribeConcurrencyControlSpanException( message.messageWrapper(), unexpectedException); throw new IllegalStateException("Flow control unexpected exception", unexpectedException); @@ -619,9 +670,23 @@ public void run() { // Don't nack it either, because we'd be nacking someone else's message. ackHandler.forget(); tracer.setSubscriberSpanExpirationResult(messageWrapper); + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.EXPIRY, + Level.FINE, + "Message expired.", + messageWrapper, + ackHandler.ackRequestData.getAckId(), + exactlyOnceDeliveryEnabled.get()); return; } tracer.startSubscribeProcessSpan(messageWrapper); + loggingUtil.logSubscriber( + LoggingUtil.SubSytem.CALLBACK_DELIVERY, + Level.FINE, + "Message delivered.", + messageWrapper, + ackHandler.ackRequestData.getAckId(), + exactlyOnceDeliveryEnabled.get()); if (shouldSetMessageFuture()) { // This is the message future that is propagated to the user SettableApiFuture messageFuture = @@ -725,7 +790,6 @@ void processOutstandingOperations() { if (!nackRequestDataList.isEmpty()) { modackRequestData.add(new ModackRequestData(0, nackRequestDataList)); } - logger.log(Level.FINER, "Sending {0} nacks", nackRequestDataList.size()); List ackRequestDataReceipts = new ArrayList(); pendingReceipts.drainTo(ackRequestDataReceipts); @@ -735,13 +799,21 @@ void processOutstandingOperations() { receiptModack.setIsReceiptModack(true); modackRequestData.add(receiptModack); } - logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size()); ackProcessor.sendModackOperations(modackRequestData); List ackRequestDataList = new ArrayList(); pendingAcks.drainTo(ackRequestDataList); - logger.log(Level.FINER, "Sending {0} acks", ackRequestDataList.size()); + loggingUtil.logEvent( + LoggingUtil.SubSytem.ACK_BATCH, + Level.FINE, + "Sending {0} ACKs, {1} NACKs, {2} receipts. Exactly Once Delivery: {3}", + new Object[] { + ackRequestDataList.size(), + nackRequestDataList.size(), + ackRequestDataReceipts.size(), + exactlyOnceDeliveryEnabled.get() + }); ackProcessor.sendAckOperations(ackRequestDataList); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 113cbf932..815e399f8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -94,6 +94,7 @@ */ public class Publisher implements PublisherInterface { private static final Logger logger = Logger.getLogger(Publisher.class.getName()); + private LoggingUtil loggingUtil = new LoggingUtil(); private static final String GZIP_COMPRESSION = "gzip"; @@ -509,6 +510,13 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { logger.log(Level.WARNING, "Attempted to publish batch with zero messages."); return; } + + loggingUtil.logPublisher( + LoggingUtil.SubSytem.PUBLISH_BATCH, + Level.FINE, + String.format("Attempting to batch publish %d messages", outstandingBatch.size()), + outstandingBatch.getMessageWrappers().get(0)); + final ApiFutureCallback futureCallback = new ApiFutureCallback() { @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 2ee077597..788a6f1ee 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -77,6 +77,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor { private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName()); + private LoggingUtil loggingUtil = new LoggingUtil(); private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100); private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10); @@ -222,6 +223,8 @@ public boolean getExactlyOnceDeliveryEnabled() { @Override protected void doStart() { logger.config("Starting subscriber."); + loggingUtil.logEvent( + LoggingUtil.SubSytem.SUBSCRIBER_STREAMS, Level.FINE, "Opening stream.", ""); messageDispatcher.start(); initialize(); notifyStarted(); @@ -229,6 +232,8 @@ protected void doStart() { @Override protected void doStop() { + loggingUtil.logEvent( + LoggingUtil.SubSytem.SUBSCRIBER_STREAMS, Level.FINE, "Closing stream.", ""); lock.lock(); try { clientStream.closeSendWithError(Status.CANCELLED.asException());