diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 2e40a399f988..f69256090725 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -73,6 +73,9 @@ public enum ServerMeter implements AbstractMetrics.Meter { UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT("segments", false), UPSERT_MISSED_QUERYABLE_DOC_ID_SNAPSHOT_COUNT("segments", false), UPSERT_PRELOAD_FAILURE("count", false), + UPSERT_KEYS_INSERTED("rows", false), + UPSERT_KEYS_UPDATED("rows", false), + UPSERT_KEYS_DELETED("rows", false), ROWS_WITH_ERRORS("rows", false), LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true), LLC_CONTROLLER_RESPONSE_COMMIT("messages", true), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 1381e4e2ab01..7f663a9a204b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -1229,6 +1229,16 @@ protected void removeDocId(IndexSegment segment, int docId) { trackUpdatedSegmentsSinceLastSnapshot(segment); } + protected void emitUpsertMetrics(RecordInfo recordInfo, boolean isNewKey) { + if (recordInfo.isDeleteRecord()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); + } else if (isNewKey) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_INSERTED, 1L); + } else { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_UPDATED, 1L); + } + } + protected void trackUpdatedSegmentsSinceLastSnapshot(IndexSegment segment) { if (_enableSnapshot && segment instanceof ImmutableSegment) { _updatedSegmentsSinceLastSnapshot.add(segment); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index a06bd82395d9..e58887943143 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -395,6 +395,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); } + emitUpsertMetrics(recordInfo, false); return newRecordLocation; } else { // Out-of-order record @@ -405,6 +406,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } else { // New primary key addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); + emitUpsertMetrics(recordInfo, true); return new RecordLocation(segment, newDocId, newComparisonValue); } }); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java index 9a1c4f133cb8..b8827b7c9a97 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java @@ -508,6 +508,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { int currentDocId = currentRecordLocation.getDocId(); if (segment == currentSegment) { replaceDocId(segment, validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); + emitUpsertMetrics(recordInfo, false); return new RecordLocation(segment, newDocId, newComparisonValue, currentRecordLocation.getDistinctSegmentCount()); } else { @@ -519,6 +520,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { _previousKeyToRecordLocationMap.put(primaryKey, currentRecordLocation); } replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); + emitUpsertMetrics(recordInfo, false); return new RecordLocation(segment, newDocId, newComparisonValue, RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount())); } @@ -538,6 +540,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } else { // New primary key addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); + emitUpsertMetrics(recordInfo, true); return new RecordLocation(segment, newDocId, newComparisonValue, 1); } });