Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,40 @@ public QueryLogger(PinotConfiguration config) {
_logBeforeProcessing = logBeforeProcessing;
}

public void log(long requestId, String query) {
if (!_logBeforeProcessing || !checkRateLimiter(null)) {
return;
/**
* Logs the query received message before processing begins.
* This method checks the rate limiter and returns whether logging was allowed.
* The return value should be passed to logQueryCompleted.
*
* @param requestId the request ID
* @param query the SQL query
* @return true if the rate limiter allowed this query (not rate-limited), false if rate-limited
*/
public boolean logQueryReceived(long requestId, String query) {
if (!checkRateLimiter()) {
return false;
}

_logger.info("SQL query for request {}: {}", requestId, query);
if (_logBeforeProcessing) {
_logger.info("SQL query for request {}: {}", requestId, query);
}

tryLogDropped();
return true;
}

public void log(QueryLogParams params) {
/**
* Logs the query completion stats after processing completes.
*
* @param params the query log parameters
* @param wasLogged true if logQueryReceived returned true, false otherwise.
* When false, the completion log will only be emitted if force-log
* conditions are met (exceptions, slow queries).
*/
public void logQueryCompleted(QueryLogParams params, boolean wasLogged) {
_logger.debug("Broker Response: {}", params._response);

if (!checkRateLimiter(params)) {
if (!wasLogged && !shouldForceLog(params)) {
return;
}

Expand All @@ -110,8 +130,8 @@ public void log(QueryLogParams params) {
tryLogDropped();
}

private boolean checkRateLimiter(@Nullable QueryLogParams params) {
boolean allowed = _logRateLimiter.tryAcquire() || shouldForceLog(params);
private boolean checkRateLimiter() {
boolean allowed = _logRateLimiter.tryAcquire();
if (!allowed) {
_numDroppedLogs.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception {
_queryLogger.log(requestId, query);
boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query);

String queryHash = CommonConstants.Broker.DEFAULT_QUERY_HASH;
if (_enableQueryFingerprinting) {
Expand Down Expand Up @@ -365,7 +365,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
queryHash);
try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, _threadAccountant)) {
return doHandleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext,
httpHeaders, accessControl);
httpHeaders, accessControl, queryWasLogged);
}
}

Expand Down Expand Up @@ -406,7 +406,7 @@ public CompileResult(BrokerResponse errorOrLiteralOnlyBrokerResponse) {

protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
@Nullable HttpHeaders httpHeaders, AccessControl accessControl, boolean queryWasLogged)
throws Exception {
// Compile the request into PinotQuery
long compilationStartTimeNs = System.nanoTime();
Expand Down Expand Up @@ -640,7 +640,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn

if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
return getEmptyBrokerOnlyResponse(pinotQuery, serverPinotQuery, requestContext, tableName, requesterIdentity,
schema, query, database);
schema, query, database, queryWasLogged);
}

if (offlineBrokerRequest != null && isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) {
Expand Down Expand Up @@ -718,7 +718,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
} else {
// If no route is found, send an empty response
return getEmptyBrokerOnlyResponse(pinotQuery, serverPinotQuery, requestContext, tableName, requesterIdentity,
schema, query, database);
schema, query, database, queryWasLogged);
}
}
long routingEndTimeNs = System.nanoTime();
Expand Down Expand Up @@ -899,9 +899,10 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
brokerResponse.setRLSFiltersApplied(rlsFiltersApplied.get());

// Log query and stats
_queryLogger.log(
_queryLogger.logQueryCompleted(
new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse,
QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, serverStats));
QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, serverStats),
queryWasLogged);

return brokerResponse;
}
Expand Down Expand Up @@ -1106,7 +1107,7 @@ private static String getRoutingPolicy(TableConfig tableConfig) {

private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, PinotQuery serverPinotQuery,
RequestContext requestContext, String tableName, @Nullable RequesterIdentity requesterIdentity, Schema schema,
String query, String database) {
String query, String database, boolean queryWasLogged) {
if (pinotQuery.isExplain()) {
// EXPLAIN PLAN results to show that query is evaluated exclusively by Broker.
return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
Expand Down Expand Up @@ -1134,8 +1135,8 @@ private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, P
}
brokerResponse.setTablesQueried(Set.of(TableNameBuilder.extractRawTableName(tableName)));
brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis());
_queryLogger.log(new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse,
QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null));
_queryLogger.logQueryCompleted(new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse,
QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null), queryWasLogged);
return brokerResponse;
}

Expand Down Expand Up @@ -1264,9 +1265,10 @@ private void handleSubquery(Expression expression, long requestId, JsonNode json
sqlNodeAndOptions.getOptions().putIfAbsent(QueryOptionKey.REGEX_DICT_SIZE_THRESHOLD, _regexDictSizeThreshold);
}

// Pass false for queryWasLogged to avoid logging subqueries separately
BrokerResponse response =
doHandleRequest(requestId, subquery, sqlNodeAndOptions, jsonRequest, requesterIdentity, requestContext,
httpHeaders, accessControl);
httpHeaders, accessControl, false);
if (response.getExceptionsSize() != 0) {
throw new RuntimeException("Caught exception while executing subquery: " + subquery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private void onFailedRequest(List<QueryProcessingException> exs) {
protected BrokerResponse handleRequestThrowing(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders)
throws QueryException, WebApplicationException {
_queryLogger.log(requestId, query);
boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query);

String queryHash = CommonConstants.Broker.DEFAULT_QUERY_HASH;
if (_enableQueryFingerprinting) {
Expand Down Expand Up @@ -349,7 +349,7 @@ protected BrokerResponse handleRequestThrowing(long requestId, String query, Sql
return explain(compiledQuery, requestId, requestContext, queryTimer);
} else {
return query(compiledQuery, requestId, requesterIdentity, requestContext, httpHeaders, queryTimer,
rlsFiltersApplied.get());
rlsFiltersApplied.get(), queryWasLogged);
}
}
}
Expand Down Expand Up @@ -522,7 +522,7 @@ private BrokerResponse explain(QueryEnvironment.CompiledQuery query, long reques

private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestId,
RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders, Timer timer,
boolean rlsFiltersApplied)
boolean rlsFiltersApplied, boolean queryWasLogged)
throws QueryException, WebApplicationException {
QueryEnvironment.QueryPlannerResult queryPlanResult = callAsync(requestId, query.getTextQuery(),
() -> query.planQuery(requestId), timer);
Expand Down Expand Up @@ -674,9 +674,10 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
brokerResponse.setRLSFiltersApplied(rlsFiltersApplied);

// Log query and stats
_queryLogger.log(
_queryLogger.logQueryCompleted(
new QueryLogger.QueryLogParams(requestContext, tableNames.toString(), brokerResponse,
QueryLogger.QueryLogParams.QueryEngine.MULTI_STAGE, requesterIdentity, null));
QueryLogger.QueryLogParams.QueryEngine.MULTI_STAGE, requesterIdentity, null),
queryWasLogged);

return brokerResponse;
} finally {
Expand Down
Loading
Loading