Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import com.google.pubsub.v1.PubsubMessage;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class LoggingUtil {
// Instantiate all loggers as static final fields to maintain strong references

private static final Logger slowAckLogger = Logger.getLogger("slow-ack");
private static final Logger callbackDeliveryLogger = Logger.getLogger("callback-delivery");
private static final Logger expiryLogger = Logger.getLogger("expiry");
private static final Logger callbackExceptionsLogger = Logger.getLogger("callback-exceptions");
private static final Logger ackBatchLogger = Logger.getLogger("ack-batch");
private static final Logger subscriberFlowControlLogger =
Logger.getLogger("subscriber-flow-control");
private static final Logger ackNackLogger = Logger.getLogger("ack-nack");
private static final Logger publishBatchLogger = Logger.getLogger("publish-batch");
private static final Logger subscriberStreamsLogger = Logger.getLogger("subscriber-streams");

public enum SubSytem {
SLOW_ACK(slowAckLogger),
CALLBACK_DELIVERY(callbackDeliveryLogger),
EXPIRY(expiryLogger),
CALLBACK_EXCEPTIONS(callbackExceptionsLogger),
ACK_BATCH(ackBatchLogger),
SUBSCRIBER_FLOW_CONTROL(subscriberFlowControlLogger),
ACK_NACK(ackNackLogger),
PUBLISH_BATCH(publishBatchLogger),
SUBSCRIBER_STREAMS(subscriberStreamsLogger);

private final Logger logger;

SubSytem(Logger logger) {
this.logger = logger;
}

public Logger getLogger() {
return logger;
}
}

public LoggingUtil() {}

private String getSubscriptionLogPrefix(
PubsubMessageWrapper messageWrapper, String ackId, boolean exactlyOnceDeliveryEnabled) {
if (messageWrapper == null || messageWrapper.getPubsubMessage() == null) {
return " Ack ID: "
+ ackId
+ ", Exactly Once Delivery: "
+ exactlyOnceDeliveryEnabled
+ " (Message details not available)";
}

PubsubMessage message = messageWrapper.getPubsubMessage();
String messageId = message.getMessageId();
String orderingKey = message.getOrderingKey();

StringBuilder sb = new StringBuilder();
sb.append("Message ID: ").append(messageId);
sb.append(", Ack ID: ").append(ackId);
if (orderingKey != null && !orderingKey.isEmpty()) {
sb.append(", Ordering Key: ").append(orderingKey);
}
sb.append(", Exactly Once Delivery: ").append(exactlyOnceDeliveryEnabled);
return sb.toString();
}

private String getPublisherLogPrefix(PubsubMessageWrapper messageWrapper) {
if (messageWrapper == null || messageWrapper.getPubsubMessage() == null) {
return " (Message details not available)";
}

PubsubMessage message = messageWrapper.getPubsubMessage();
String messageId = message.getMessageId();
String orderingKey = message.getOrderingKey();

StringBuilder sb = new StringBuilder();
sb.append("Message ID: ").append(messageId);
if (orderingKey != null && !orderingKey.isEmpty()) {
sb.append(", Ordering Key: ").append(orderingKey);
}
return sb.toString();
}

public void logSubscriber(
SubSytem subSystem,
Level level,
String msg,
PubsubMessageWrapper messageWrapper,
String ackId,
boolean exactlyOnceDeliveryEnabled) {
Logger logger = subSystem.getLogger();
if (logger.isLoggable(level)) {
String prefix = getSubscriptionLogPrefix(messageWrapper, ackId, exactlyOnceDeliveryEnabled);
logger.log(level, prefix + " - " + msg);
}
}

public void logSubscriber(
SubSytem subSystem,
Level level,
String msg,
PubsubMessageWrapper messageWrapper,
String ackId,
boolean exactlyOnceDeliveryEnabled,
Throwable throwable) {
Logger logger = subSystem.getLogger();
if (logger.isLoggable(level)) {
String prefix = getSubscriptionLogPrefix(messageWrapper, ackId, exactlyOnceDeliveryEnabled);
logger.log(level, prefix + " - " + msg, throwable);
}
}

public void logPublisher(
SubSytem subSystem, Level level, String msg, PubsubMessageWrapper messageWrapper) {
Logger logger = subSystem.getLogger();
if (logger.isLoggable(level)) {
String prefix = getPublisherLogPrefix(messageWrapper);
logger.log(level, prefix + " - " + msg);
}
}

public void logPublisher(
SubSytem subSystem,
Level level,
String msg,
PubsubMessageWrapper messageWrapper,
Throwable throwable) {
Logger logger = subSystem.getLogger();
if (logger.isLoggable(level)) {
String prefix = getPublisherLogPrefix(messageWrapper);
logger.log(level, prefix + " - " + msg, throwable);
}
}

public void logEvent(SubSytem subSytem, Level level, String msg, Object... params) {
Logger logger = subSytem.getLogger();
if (logger.isLoggable(level)) {
logger.log(level, msg, params);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
*/
class MessageDispatcher {
private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
private LoggingUtil loggingUtil = new LoggingUtil();

@InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
Expand Down Expand Up @@ -112,6 +113,8 @@ class MessageDispatcher {
private final SubscriberShutdownSettings subscriberShutdownSettings;
private final AtomicBoolean nackImmediatelyShutdownInProgress = new AtomicBoolean(false);

private final double slowAckPercentile = 99.0;

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
Expand Down Expand Up @@ -157,12 +160,13 @@ private void forget() {

@Override
public void onFailure(Throwable t) {
logger.log(
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.CALLBACK_EXCEPTIONS,
Level.WARNING,
"MessageReceiver failed to process ack ID: "
+ this.ackRequestData.getAckId()
+ ", the message will be nacked.",
t);
"MessageReceiver exception.",
this.ackRequestData.getMessageWrapper(),
this.ackRequestData.getAckId(),
exactlyOnceDeliveryEnabled.get());
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
Expand All @@ -171,6 +175,19 @@ public void onFailure(Throwable t) {

@Override
public void onSuccess(AckReply reply) {
int ackLatency =
Ints.saturatedCast((long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D));
if (ackLatency >= ackLatencyDistribution.getPercentile(slowAckPercentile)) {
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.SLOW_ACK,
Level.FINE,
String.format(
"Message ack duration of %d is higher than the p99 ack duration", ackLatency),
this.ackRequestData.getMessageWrapper(),
this.ackRequestData.getAckId(),
exactlyOnceDeliveryEnabled.get());
}

switch (reply) {
case ACK:
if (nackImmediatelyShutdownInProgress.get() && exactlyOnceDeliveryEnabled.get()) {
Expand All @@ -180,15 +197,27 @@ public void onSuccess(AckReply reply) {
} else {
pendingAcks.add(this.ackRequestData);
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
ackLatencyDistribution.record(ackLatency);
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
}
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.ACK_NACK,
Level.FINE,
"Ack called on message.",
this.ackRequestData.getMessageWrapper(),
this.ackRequestData.getAckId(),
exactlyOnceDeliveryEnabled.get());
break;
case NACK:
pendingNacks.add(this.ackRequestData);
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.ACK_NACK,
Level.FINE,
"Nack called on message.",
this.ackRequestData.getMessageWrapper(),
this.ackRequestData.getAckId(),
exactlyOnceDeliveryEnabled.get());
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -568,10 +597,32 @@ private void processBatch(List<OutstandingMessage> batch) {
// shutdown will block on processing of all these messages anyway.
tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
try {
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.SUBSCRIBER_FLOW_CONTROL,
Level.FINE,
"Flow controller is blocking.",
message.messageWrapper(),
message.messageWrapper().getAckId(),
exactlyOnceDeliveryEnabled.get());
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.SUBSCRIBER_FLOW_CONTROL,
Level.FINE,
"Flow controller is done blocking.",
message.messageWrapper(),
message.messageWrapper().getAckId(),
exactlyOnceDeliveryEnabled.get());
tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.SUBSCRIBER_FLOW_CONTROL,
Level.FINE,
"Flow controller unexpected exception.",
message.messageWrapper(),
message.messageWrapper().getAckId(),
exactlyOnceDeliveryEnabled.get(),
unexpectedException);
tracer.setSubscribeConcurrencyControlSpanException(
message.messageWrapper(), unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
Expand Down Expand Up @@ -619,9 +670,23 @@ public void run() {
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
tracer.setSubscriberSpanExpirationResult(messageWrapper);
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.EXPIRY,
Level.FINE,
"Message expired.",
messageWrapper,
ackHandler.ackRequestData.getAckId(),
exactlyOnceDeliveryEnabled.get());
return;
}
tracer.startSubscribeProcessSpan(messageWrapper);
loggingUtil.logSubscriber(
LoggingUtil.SubSytem.CALLBACK_DELIVERY,
Level.FINE,
"Message delivered.",
messageWrapper,
ackHandler.ackRequestData.getAckId(),
exactlyOnceDeliveryEnabled.get());
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand Down Expand Up @@ -725,7 +790,6 @@ void processOutstandingOperations() {
if (!nackRequestDataList.isEmpty()) {
modackRequestData.add(new ModackRequestData(0, nackRequestDataList));
}
logger.log(Level.FINER, "Sending {0} nacks", nackRequestDataList.size());

List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
Expand All @@ -735,13 +799,21 @@ void processOutstandingOperations() {
receiptModack.setIsReceiptModack(true);
modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());

ackProcessor.sendModackOperations(modackRequestData);

List<AckRequestData> ackRequestDataList = new ArrayList<AckRequestData>();
pendingAcks.drainTo(ackRequestDataList);
logger.log(Level.FINER, "Sending {0} acks", ackRequestDataList.size());
loggingUtil.logEvent(
LoggingUtil.SubSytem.ACK_BATCH,
Level.FINE,
"Sending {0} ACKs, {1} NACKs, {2} receipts. Exactly Once Delivery: {3}",
new Object[] {
ackRequestDataList.size(),
nackRequestDataList.size(),
ackRequestDataReceipts.size(),
exactlyOnceDeliveryEnabled.get()
});

ackProcessor.sendAckOperations(ackRequestDataList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
*/
public class Publisher implements PublisherInterface {
private static final Logger logger = Logger.getLogger(Publisher.class.getName());
private LoggingUtil loggingUtil = new LoggingUtil();

private static final String GZIP_COMPRESSION = "gzip";

Expand Down Expand Up @@ -509,6 +510,13 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
logger.log(Level.WARNING, "Attempted to publish batch with zero messages.");
return;
}

loggingUtil.logPublisher(
LoggingUtil.SubSytem.PUBLISH_BATCH,
Level.FINE,
String.format("Attempting to batch publish %d messages", outstandingBatch.size()),
outstandingBatch.getMessageWrappers().get(0));

final ApiFutureCallback<PublishResponse> futureCallback =
new ApiFutureCallback<PublishResponse>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor {
private static final Logger logger =
Logger.getLogger(StreamingSubscriberConnection.class.getName());
private LoggingUtil loggingUtil = new LoggingUtil();

private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
Expand Down Expand Up @@ -222,13 +223,17 @@ public boolean getExactlyOnceDeliveryEnabled() {
@Override
protected void doStart() {
logger.config("Starting subscriber.");
loggingUtil.logEvent(
LoggingUtil.SubSytem.SUBSCRIBER_STREAMS, Level.FINE, "Opening stream.", "");
messageDispatcher.start();
initialize();
notifyStarted();
}

@Override
protected void doStop() {
loggingUtil.logEvent(
LoggingUtil.SubSytem.SUBSCRIBER_STREAMS, Level.FINE, "Closing stream.", "");
lock.lock();
try {
clientStream.closeSendWithError(Status.CANCELLED.asException());
Expand Down
Loading