diff --git a/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 b/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 index 2fcd623e3e9..064e471b29a 100644 --- a/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 +++ b/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 @@ -518,6 +518,7 @@ MATERIALIZED: 'MATERIALIZED'; VIEW: 'VIEW'; PRIMARY: 'PRIMARY'; REPLACE: 'REPLACE'; +KEEP: 'KEEP'; ASSERT: 'ASSERT'; ADD: 'ADD'; ALTER: 'ALTER'; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index e0b20d6c93f..7f4fe1dcf77 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -138,7 +138,7 @@ public static class Masking { String topicValuesPattern; public enum Type { - REMOVE, MASK, REPLACE + REMOVE, MASK, REPLACE, KEEP } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java index a2e9c88f86b..d90a26a5603 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java @@ -3,7 +3,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.databind.node.ContainerNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.provectus.kafka.ui.config.ClustersProperties; @@ -12,13 +12,17 @@ import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.function.UnaryOperator; import java.util.regex.Pattern; import javax.annotation.Nullable; import lombok.Value; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DataMasking { + private static final Logger log = LoggerFactory.getLogger(DataMasking.class); private static final JsonMapper JSON_MAPPER = new JsonMapper(); @@ -72,8 +76,14 @@ public UnaryOperator getMaskerForTopic(String topic) { @VisibleForTesting UnaryOperator getMaskingFunction(String topic, Serde.Target target) { - var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).toList(); - if (targetMasks.isEmpty()) { + var targetMasks = masks.stream() + .filter(m -> m.shouldBeApplied(topic, target)) + .toList(); + + // If there's an unmapped topic, anonymise it except for the key - changable behaviour! + if (targetMasks.isEmpty() && target == Serde.Target.VALUE) { + return s -> "\"ANONYMIZED\""; + } else if (targetMasks.isEmpty() && target == Serde.Target.KEY) { return UnaryOperator.identity(); } return inputStr -> { @@ -82,20 +92,36 @@ UnaryOperator getMaskingFunction(String topic, Serde.Target target) { } try { JsonNode json = JSON_MAPPER.readTree(inputStr); - if (json.isContainerNode()) { + if (json.isContainerNode() && json.isObject()) { + ObjectNode original = (ObjectNode) json; + ObjectNode temp = original.deepCopy(); + + Set maskedFields = new java.util.HashSet<>(); + + // Apply masking rules and track affected fields for (Mask targetMask : targetMasks) { - json = targetMask.policy.applyToJsonContainer((ContainerNode) json); + temp = (ObjectNode) targetMask.policy.applyToJsonContainer(temp, maskedFields); } - return json.toString(); + final ObjectNode masked = temp; + + ObjectNode result = JSON_MAPPER.createObjectNode(); + original.fieldNames().forEachRemaining(field -> { + log.info("Target looks like {}", target); + if (maskedFields.contains(field) && masked.has(field)) { + result.set(field, masked.get(field)); + } else if (target == Serde.Target.VALUE) { + result.put(field, "ANONYMIZED"); + } else { + result.set(field, original.get(field)); + } + }); + + return result.toString(); } - } catch (JsonProcessingException jsonException) { - //just ignore + } catch (JsonProcessingException ignored) { + // fallback to string-based masking } - // if we can't parse input as json or parsed json is not object/array - // we just apply first found policy - // (there is no need to apply all of them, because they will just override each other) - return targetMasks.get(0).policy.applyToString(inputStr); + return "\"ANONYMIZED\""; }; } - } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Keep.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Keep.java new file mode 100644 index 00000000000..314d6ae846b --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Keep.java @@ -0,0 +1,61 @@ +package com.provectus.kafka.ui.service.masking.policies; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ContainerNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import java.util.Set; + +class Keep extends MaskingPolicy { + + Keep(FieldsSelector fieldsSelector) { + super(fieldsSelector); + } + + @Override + public String applyToString(String str) { + return str; + } + + @Override + public ContainerNode applyToJsonContainer(ContainerNode node, Set maskedFields) { + return (ContainerNode) keepWithFieldsCheck(node, maskedFields); + } + + private JsonNode keepWithFieldsCheck(JsonNode node, Set maskedFields) { + if (node.isObject()) { + ObjectNode obj = ((ObjectNode) node).objectNode(); + node.fields().forEachRemaining(f -> { + String fieldName = f.getKey(); + JsonNode fieldVal = f.getValue(); + + if (fieldShouldBeMasked(fieldName)) { + maskedFields.add(fieldName); + obj.set(fieldName, keepNodeRecursively(fieldVal, maskedFields)); + } else { + obj.set(fieldName, keepWithFieldsCheck(fieldVal, maskedFields)); + } + }); + return obj; + } else if (node.isArray()) { + ArrayNode arr = ((ArrayNode) node).arrayNode(node.size()); + node.elements().forEachRemaining(e -> arr.add(keepWithFieldsCheck(e, maskedFields))); + return arr; + } + return node; + } + + private JsonNode keepNodeRecursively(JsonNode node, Set maskedFields) { + if (node.isObject()) { + ObjectNode obj = ((ObjectNode) node).objectNode(); + node.fields().forEachRemaining(f -> obj.set(f.getKey(), keepNodeRecursively(f.getValue(), maskedFields))); + return obj; + } else if (node.isArray()) { + ArrayNode arr = ((ArrayNode) node).arrayNode(node.size()); + node.elements().forEachRemaining(e -> arr.add(keepNodeRecursively(e, maskedFields))); + return arr; + } + return node.isTextual() ? TextNode.valueOf(node.textValue()) : node.deepCopy(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Mask.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Mask.java index e6a469f2c03..87e60d2b45f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Mask.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Mask.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.base.Preconditions; import java.util.List; +import java.util.Set; import java.util.function.UnaryOperator; class Mask extends MaskingPolicy { @@ -21,8 +22,8 @@ class Mask extends MaskingPolicy { } @Override - public ContainerNode applyToJsonContainer(ContainerNode node) { - return (ContainerNode) maskWithFieldsCheck(node); + public ContainerNode applyToJsonContainer(ContainerNode node, Set maskedFields) { + return (ContainerNode) maskWithFieldsCheck(node, maskedFields); } @Override @@ -40,7 +41,7 @@ private static UnaryOperator createMasker(List maskingChars) { switch (Character.getType(cp)) { case Character.SPACE_SEPARATOR, Character.LINE_SEPARATOR, - Character.PARAGRAPH_SEPARATOR -> sb.appendCodePoint(cp); // keeping separators as-is + Character.PARAGRAPH_SEPARATOR -> sb.appendCodePoint(cp); // keep separators as-is case Character.UPPERCASE_LETTER -> sb.append(maskingChars.get(0)); case Character.LOWERCASE_LETTER -> sb.append(maskingChars.get(1)); case Character.DECIMAL_DIGIT_NUMBER -> sb.append(maskingChars.get(2)); @@ -51,35 +52,38 @@ private static UnaryOperator createMasker(List maskingChars) { }; } - private JsonNode maskWithFieldsCheck(JsonNode node) { + private JsonNode maskWithFieldsCheck(JsonNode node, Set maskedFields) { if (node.isObject()) { ObjectNode obj = ((ObjectNode) node).objectNode(); node.fields().forEachRemaining(f -> { String fieldName = f.getKey(); JsonNode fieldVal = f.getValue(); if (fieldShouldBeMasked(fieldName)) { - obj.set(fieldName, maskNodeRecursively(fieldVal)); + maskedFields.add(fieldName); + obj.set(fieldName, maskNodeRecursively(fieldVal, maskedFields)); } else { - obj.set(fieldName, maskWithFieldsCheck(fieldVal)); + obj.set(fieldName, maskWithFieldsCheck(fieldVal, maskedFields)); } }); return obj; } else if (node.isArray()) { ArrayNode arr = ((ArrayNode) node).arrayNode(node.size()); - node.elements().forEachRemaining(e -> arr.add(maskWithFieldsCheck(e))); + node.elements().forEachRemaining(e -> arr.add(maskWithFieldsCheck(e, maskedFields))); return arr; } return node; } - private JsonNode maskNodeRecursively(JsonNode node) { + private JsonNode maskNodeRecursively(JsonNode node, Set maskedFields) { if (node.isObject()) { ObjectNode obj = ((ObjectNode) node).objectNode(); - node.fields().forEachRemaining(f -> obj.set(f.getKey(), maskNodeRecursively(f.getValue()))); + node.fields().forEachRemaining(f -> + obj.set(f.getKey(), maskNodeRecursively(f.getValue(), maskedFields)) + ); return obj; } else if (node.isArray()) { ArrayNode arr = ((ArrayNode) node).arrayNode(node.size()); - node.elements().forEachRemaining(e -> arr.add(maskNodeRecursively(e))); + node.elements().forEachRemaining(e -> arr.add(maskNodeRecursively(e, maskedFields))); return arr; } return new TextNode(masker.apply(node.asText())); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/MaskingPolicy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/MaskingPolicy.java index 9b80da0cb18..61036f348f9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/MaskingPolicy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/MaskingPolicy.java @@ -2,11 +2,13 @@ import com.fasterxml.jackson.databind.node.ContainerNode; import com.provectus.kafka.ui.config.ClustersProperties; +import java.util.Set; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public abstract class MaskingPolicy { + public static MaskingPolicy create(ClustersProperties.Masking property) { FieldsSelector fieldsSelector = FieldsSelector.create(property); return switch (property.getType()) { @@ -23,6 +25,7 @@ public static MaskingPolicy create(ClustersProperties.Masking property) { ? Mask.DEFAULT_PATTERN : property.getMaskingCharsReplacement() ); + case KEEP -> new Keep(fieldsSelector); }; } @@ -34,7 +37,7 @@ protected boolean fieldShouldBeMasked(String fieldName) { return fieldsSelector.shouldBeMasked(fieldName); } - public abstract ContainerNode applyToJsonContainer(ContainerNode node); + public abstract ContainerNode applyToJsonContainer(ContainerNode node, Set maskedFields); public abstract String applyToString(String str); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Remove.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Remove.java index cc5cdd14159..ae49c6a295e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Remove.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Remove.java @@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ContainerNode; import com.fasterxml.jackson.databind.node.ObjectNode; - +import java.util.Set; class Remove extends MaskingPolicy { @@ -18,24 +18,26 @@ public String applyToString(String str) { } @Override - public ContainerNode applyToJsonContainer(ContainerNode node) { - return (ContainerNode) removeFields(node); + public ContainerNode applyToJsonContainer(ContainerNode node, Set maskedFields) { + return (ContainerNode) removeFields(node, maskedFields); } - private JsonNode removeFields(JsonNode node) { + private JsonNode removeFields(JsonNode node, Set maskedFields) { if (node.isObject()) { ObjectNode obj = ((ObjectNode) node).objectNode(); node.fields().forEachRemaining(f -> { String fieldName = f.getKey(); JsonNode fieldVal = f.getValue(); if (!fieldShouldBeMasked(fieldName)) { - obj.set(fieldName, removeFields(fieldVal)); + obj.set(fieldName, removeFields(fieldVal, maskedFields)); + } else { + maskedFields.add(fieldName); // ✅ track removed field } }); return obj; } else if (node.isArray()) { - var arr = ((ArrayNode) node).arrayNode(node.size()); - node.elements().forEachRemaining(e -> arr.add(removeFields(e))); + ArrayNode arr = ((ArrayNode) node).arrayNode(node.size()); + node.elements().forEachRemaining(e -> arr.add(removeFields(e, maskedFields))); return arr; } return node; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Replace.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Replace.java index 1cf91793d22..b3c951594b6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Replace.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Replace.java @@ -5,17 +5,17 @@ import com.fasterxml.jackson.databind.node.ContainerNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; -import com.google.common.base.Preconditions; +import java.util.Set; class Replace extends MaskingPolicy { - static final String DEFAULT_REPLACEMENT = "***DATA_MASKED***"; - private final String replacement; - Replace(FieldsSelector fieldsSelector, String replacementString) { + static final String DEFAULT_REPLACEMENT = "ANONYMIZED"; + + Replace(FieldsSelector fieldsSelector, String replacement) { super(fieldsSelector); - this.replacement = Preconditions.checkNotNull(replacementString); + this.replacement = replacement; } @Override @@ -24,42 +24,29 @@ public String applyToString(String str) { } @Override - public ContainerNode applyToJsonContainer(ContainerNode node) { - return (ContainerNode) replaceWithFieldsCheck(node); + public ContainerNode applyToJsonContainer(ContainerNode node, Set maskedFields) { + return (ContainerNode) replaceFields(node, maskedFields); } - private JsonNode replaceWithFieldsCheck(JsonNode node) { + private JsonNode replaceFields(JsonNode node, Set maskedFields) { if (node.isObject()) { ObjectNode obj = ((ObjectNode) node).objectNode(); node.fields().forEachRemaining(f -> { String fieldName = f.getKey(); JsonNode fieldVal = f.getValue(); if (fieldShouldBeMasked(fieldName)) { - obj.set(fieldName, replaceRecursive(fieldVal)); + maskedFields.add(fieldName); // ✅ track replaced field + obj.set(fieldName, new TextNode(replacement)); } else { - obj.set(fieldName, replaceWithFieldsCheck(fieldVal)); + obj.set(fieldName, replaceFields(fieldVal, maskedFields)); } }); return obj; } else if (node.isArray()) { ArrayNode arr = ((ArrayNode) node).arrayNode(node.size()); - node.elements().forEachRemaining(e -> arr.add(replaceWithFieldsCheck(e))); + node.elements().forEachRemaining(e -> arr.add(replaceFields(e, maskedFields))); return arr; } - // if it is not an object or array - we have nothing to replace here return node; } - - private JsonNode replaceRecursive(JsonNode node) { - if (node.isObject()) { - ObjectNode obj = ((ObjectNode) node).objectNode(); - node.fields().forEachRemaining(f -> obj.set(f.getKey(), replaceRecursive(f.getValue()))); - return obj; - } else if (node.isArray()) { - ArrayNode arr = ((ArrayNode) node).arrayNode(node.size()); - node.elements().forEachRemaining(e -> arr.add(replaceRecursive(e))); - return arr; - } - return new TextNode(replacement); - } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/RemoveTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/RemoveTest.java index 9393ea1c626..65350fa6cd7 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/RemoveTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/RemoveTest.java @@ -5,7 +5,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.node.ContainerNode; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Stream; import lombok.SneakyThrows; import org.junit.jupiter.params.ParameterizedTest; @@ -21,7 +23,8 @@ class RemoveTest { @MethodSource void testApplyToJsonContainer(FieldsSelector fieldsSelector, ContainerNode original, ContainerNode expected) { var policy = new Remove(fieldsSelector); - assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected); + Set maskedFields = new HashSet<>(); + assertThat(policy.applyToJsonContainer(original, maskedFields)).isEqualTo(expected); } private static Stream testApplyToJsonContainer() { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/ReplaceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/ReplaceTest.java index 9f2fcd90c4c..fc235baab28 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/ReplaceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/ReplaceTest.java @@ -22,7 +22,7 @@ class ReplaceTest { @MethodSource void testApplyToJsonContainer(FieldsSelector fieldsSelector, ContainerNode original, ContainerNode expected) { var policy = new Replace(fieldsSelector, REPLACEMENT_STRING); - assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected); + assertThat(policy.applyToJsonContainer(original, fieldName)).isEqualTo(expected); } private static Stream testApplyToJsonContainer() { diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index ae51d31568f..72be825af61 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -4052,6 +4052,7 @@ components: - REMOVE - MASK - REPLACE + - KEEP fields: type: array items: