Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,29 @@ public class QueryLogger {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogger.class);
private static final QueryLogEntry[] QUERY_LOG_ENTRY_VALUES = QueryLogEntry.values();

private static final String FINGERPRINT_FAILED_QUERY_REDACTED = "FINGERPRINT_FAILED_QUERY_REDACTED";
private static final String FULLY_REDACTED = "REDACTED";

public enum SqlRedactionMode {
NONE,
LITERAL_VALUES,
FULL;

public static SqlRedactionMode fromString(String value) {
try {
return valueOf(value.toUpperCase());
} catch (IllegalArgumentException e) {
LOGGER.warn("Invalid SQL redaction mode '{}', defaulting to NONE", value);
return NONE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails open on misconfiguration. If an operator sets an invalid pinot.broker.query.log.sqlRedaction value, we silently fall back to NONE and start emitting raw SQL, which is the exact unsafe behavior this knob is supposed to prevent. For a privacy feature, the safer behavior is to reject startup or fail closed to a redacted mode instead of disabling redaction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, really good point. I've updated this for now while I think about your other comment.

}
}
}

private final int _maxQueryLengthToLog;
private final RateLimiter _logRateLimiter;
private final boolean _enableIpLogging;
private final boolean _logBeforeProcessing;
private final SqlRedactionMode _sqlRedactionMode;
private final Logger _logger;
private final RateLimiter _droppedLogRateLimiter;
private final AtomicLong _numDroppedLogs = new AtomicLong(0L);
Expand All @@ -63,20 +82,24 @@ public QueryLogger(PinotConfiguration config) {
config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING,
Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING),
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_BEFORE_PROCESSING,
Broker.DEFAULT_BROKER_QUERY_LOG_BEFORE_PROCESSING), LOGGER, RateLimiter.create(1)
Broker.DEFAULT_BROKER_QUERY_LOG_BEFORE_PROCESSING),
SqlRedactionMode.fromString(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_SQL_REDACTION,
Broker.DEFAULT_BROKER_QUERY_LOG_SQL_REDACTION)),
LOGGER, RateLimiter.create(1)
// log once a second for dropped log count
);
}

@VisibleForTesting
QueryLogger(RateLimiter logRateLimiter, int maxQueryLengthToLog, boolean enableIpLogging, boolean logBeforeProcessing,
Logger logger, RateLimiter droppedLogRateLimiter) {
SqlRedactionMode sqlRedactionMode, Logger logger, RateLimiter droppedLogRateLimiter) {
_logRateLimiter = logRateLimiter;
_maxQueryLengthToLog = maxQueryLengthToLog;
_enableIpLogging = enableIpLogging;
_logger = logger;
_droppedLogRateLimiter = droppedLogRateLimiter;
_logBeforeProcessing = logBeforeProcessing;
_sqlRedactionMode = sqlRedactionMode;
}

/**
Expand All @@ -86,15 +109,16 @@ public QueryLogger(PinotConfiguration config) {
*
* @param requestId the request ID
* @param query the SQL query
* @param queryFingerprint the query fingerprint (used when redaction is enabled)
* @return true if the rate limiter allowed this query (not rate-limited), false if rate-limited
*/
public boolean logQueryReceived(long requestId, String query) {
public boolean logQueryReceived(long requestId, String query, @Nullable QueryFingerprint queryFingerprint) {
if (!checkRateLimiter()) {
return false;
}

if (_logBeforeProcessing) {
_logger.info("SQL query for request {}: {}", requestId, query);
_logger.info("SQL query for request {}: {}", requestId, redactQuery(query, queryFingerprint));
}

tryLogDropped();
Expand Down Expand Up @@ -123,8 +147,8 @@ public void logQueryCompleted(QueryLogParams params, boolean wasLogged) {
}

// always log the query last - don't add this to the QueryLogEntry enum
queryLogBuilder.append("query=")
.append(StringUtils.substring(params._requestContext.getQuery(), 0, _maxQueryLengthToLog));
String redacted = redactQuery(params._requestContext.getQuery(), params._requestContext.getQueryFingerprint());
queryLogBuilder.append("query=").append(StringUtils.substring(redacted, 0, _maxQueryLengthToLog));
_logger.info(queryLogBuilder.toString());

tryLogDropped();
Expand Down Expand Up @@ -158,6 +182,22 @@ public double getLogRateLimit() {
return _logRateLimiter.getRate();
}

public SqlRedactionMode getSqlRedactionMode() {
return _sqlRedactionMode;
}

private String redactQuery(String query, @Nullable QueryFingerprint queryFingerprint) {
switch (_sqlRedactionMode) {
case FULL:
return FULLY_REDACTED;
case LITERAL_VALUES:
return queryFingerprint != null ? queryFingerprint.getFingerprint() : FINGERPRINT_FAILED_QUERY_REDACTED;
case NONE:
default:
return query;
}
}

private boolean shouldForceLog(@Nullable QueryLogParams params) {
if (params == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,15 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro

_enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
_enableQueryFingerprinting = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING,
boolean fingerprintingConfigured = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING,
Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING);
boolean redactionNeedsFingerprinting =
_queryLogger.getSqlRedactionMode() == QueryLogger.SqlRedactionMode.LITERAL_VALUES;
if (redactionNeedsFingerprinting && !fingerprintingConfigured) {
LOGGER.warn("SQL redaction mode 'literal_values' requires query fingerprinting. "
+ "Enabling query fingerprinting automatically.");
}
_enableQueryFingerprinting = fingerprintingConfigured || redactionNeedsFingerprinting;
if (_enableMultistageMigrationMetric) {
_multistageCompileExecutor = Executors.newSingleThreadExecutor();
_multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
Expand Down Expand Up @@ -317,12 +324,11 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception {
boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query);

QueryFingerprint queryFingerprint = null;
String queryHash = CommonConstants.Broker.DEFAULT_QUERY_HASH;
if (_enableQueryFingerprinting) {
try {
QueryFingerprint queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions);
queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions);
if (queryFingerprint != null) {
queryHash = queryFingerprint.getQueryHash();
requestContext.setQueryFingerprint(queryFingerprint);
Expand All @@ -332,6 +338,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fingerprint generation failed above, this still hands the raw SQL to the query logger and the request-handler warning path already logged it once. The same pattern also exists on other broker error paths that still log query directly, so literal_values and especially full do not actually guarantee that SQL stays out of broker logs. We need a shared redaction helper for every broker-side query log before advertising this as broker SQL redaction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another great catch. From what I initially found, the queries are all being logged from BaseSingleStageBrokerRequestHandler and MultiStageBrokerRequestHandler. My thinking is to start by exposing redactQuery as a method on QueryLogger and have both classes use that. This minimizes the amount of changes and doesn't require a global redaction config that all classes need access to right. It does leave things open to this pattern if needed in the future.

What do you think?


boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query, queryFingerprint);

String cid = extractClientRequestId(sqlNodeAndOptions);
if (cid == null) {
cid = Long.toString(requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,16 @@ tlsConfig, isQueryCancellationEnabled(), cancelTimeout, dispatchKeepAliveTimeMs,
_config.containsKey(CommonConstants.Broker.CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES) ? Set.copyOf(
_config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES, List.of()))
: CommonConstants.Broker.DEFAULT_DISABLED_RULES;
_enableQueryFingerprinting = _config.getProperty(
boolean fingerprintingConfigured = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING,
CommonConstants.Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING);
boolean redactionNeedsFingerprinting =
_queryLogger.getSqlRedactionMode() == QueryLogger.SqlRedactionMode.LITERAL_VALUES;
if (redactionNeedsFingerprinting && !fingerprintingConfigured) {
LOGGER.warn("SQL redaction mode 'literal_values' requires query fingerprinting. "
+ "Enabling query fingerprinting automatically.");
}
_enableQueryFingerprinting = fingerprintingConfigured || redactionNeedsFingerprinting;
}

@Override
Expand Down Expand Up @@ -357,12 +364,11 @@ private void onFailedRequest(List<QueryProcessingException> exs) {
protected BrokerResponse handleRequestThrowing(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders)
throws QueryException, WebApplicationException {
boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query);

QueryFingerprint queryFingerprint = null;
String queryHash = CommonConstants.Broker.DEFAULT_QUERY_HASH;
if (_enableQueryFingerprinting) {
try {
QueryFingerprint queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions);
queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions);
if (queryFingerprint != null) {
queryHash = queryFingerprint.getQueryHash();
requestContext.setQueryFingerprint(queryFingerprint);
Expand All @@ -372,6 +378,8 @@ protected BrokerResponse handleRequestThrowing(long requestId, String query, Sql
}
}

boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query, queryFingerprint);

String cid = extractClientRequestId(sqlNodeAndOptions);
if (cid == null) {
cid = Long.toString(requestId);
Expand Down
Loading
Loading