Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ MATERIALIZED: 'MATERIALIZED';
VIEW: 'VIEW';
PRIMARY: 'PRIMARY';
REPLACE: 'REPLACE';
KEEP: 'KEEP';
ASSERT: 'ASSERT';
ADD: 'ADD';
ALTER: 'ALTER';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static class Masking {
String topicValuesPattern;

public enum Type {
REMOVE, MASK, REPLACE
REMOVE, MASK, REPLACE, KEEP
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.config.ClustersProperties;
Expand All @@ -12,13 +12,17 @@
import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataMasking {
private static final Logger log = LoggerFactory.getLogger(DataMasking.class);

private static final JsonMapper JSON_MAPPER = new JsonMapper();

Expand Down Expand Up @@ -72,8 +76,14 @@ public UnaryOperator<TopicMessageDTO> getMaskerForTopic(String topic) {

@VisibleForTesting
UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).toList();
if (targetMasks.isEmpty()) {
var targetMasks = masks.stream()
.filter(m -> m.shouldBeApplied(topic, target))
.toList();

// If there's an unmapped topic, anonymise it except for the key - changable behaviour!
if (targetMasks.isEmpty() && target == Serde.Target.VALUE) {
return s -> "\"ANONYMIZED\"";
} else if (targetMasks.isEmpty() && target == Serde.Target.KEY) {
return UnaryOperator.identity();
}
return inputStr -> {
Expand All @@ -82,20 +92,36 @@ UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
}
try {
JsonNode json = JSON_MAPPER.readTree(inputStr);
if (json.isContainerNode()) {
if (json.isContainerNode() && json.isObject()) {
ObjectNode original = (ObjectNode) json;
ObjectNode temp = original.deepCopy();

Set<String> maskedFields = new java.util.HashSet<>();

// Apply masking rules and track affected fields
for (Mask targetMask : targetMasks) {
json = targetMask.policy.applyToJsonContainer((ContainerNode<?>) json);
temp = (ObjectNode) targetMask.policy.applyToJsonContainer(temp, maskedFields);
}
return json.toString();
final ObjectNode masked = temp;

ObjectNode result = JSON_MAPPER.createObjectNode();
original.fieldNames().forEachRemaining(field -> {
log.info("Target looks like {}", target);
if (maskedFields.contains(field) && masked.has(field)) {
result.set(field, masked.get(field));
} else if (target == Serde.Target.VALUE) {
result.put(field, "ANONYMIZED");
} else {
result.set(field, original.get(field));
}
});

return result.toString();
}
} catch (JsonProcessingException jsonException) {
//just ignore
} catch (JsonProcessingException ignored) {
// fallback to string-based masking
}
// if we can't parse input as json or parsed json is not object/array
// we just apply first found policy
// (there is no need to apply all of them, because they will just override each other)
return targetMasks.get(0).policy.applyToString(inputStr);
return "\"ANONYMIZED\"";
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.provectus.kafka.ui.service.masking.policies;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.util.Set;

class Keep extends MaskingPolicy {

Keep(FieldsSelector fieldsSelector) {
super(fieldsSelector);
}

@Override
public String applyToString(String str) {
return str;
}

@Override
public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node, Set<String> maskedFields) {
return (ContainerNode<?>) keepWithFieldsCheck(node, maskedFields);
}

private JsonNode keepWithFieldsCheck(JsonNode node, Set<String> maskedFields) {
if (node.isObject()) {
ObjectNode obj = ((ObjectNode) node).objectNode();
node.fields().forEachRemaining(f -> {
String fieldName = f.getKey();
JsonNode fieldVal = f.getValue();

if (fieldShouldBeMasked(fieldName)) {
maskedFields.add(fieldName);
obj.set(fieldName, keepNodeRecursively(fieldVal, maskedFields));
} else {
obj.set(fieldName, keepWithFieldsCheck(fieldVal, maskedFields));
}
});
return obj;
} else if (node.isArray()) {
ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(keepWithFieldsCheck(e, maskedFields)));
return arr;
}
return node;
}

private JsonNode keepNodeRecursively(JsonNode node, Set<String> maskedFields) {
if (node.isObject()) {
ObjectNode obj = ((ObjectNode) node).objectNode();
node.fields().forEachRemaining(f -> obj.set(f.getKey(), keepNodeRecursively(f.getValue(), maskedFields)));
return obj;
} else if (node.isArray()) {
ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(keepNodeRecursively(e, maskedFields)));
return arr;
}
return node.isTextual() ? TextNode.valueOf(node.textValue()) : node.deepCopy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Set;
import java.util.function.UnaryOperator;

class Mask extends MaskingPolicy {
Expand All @@ -21,8 +22,8 @@ class Mask extends MaskingPolicy {
}

@Override
public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node) {
return (ContainerNode<?>) maskWithFieldsCheck(node);
public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node, Set<String> maskedFields) {
return (ContainerNode<?>) maskWithFieldsCheck(node, maskedFields);
}

@Override
Expand All @@ -40,7 +41,7 @@ private static UnaryOperator<String> createMasker(List<String> maskingChars) {
switch (Character.getType(cp)) {
case Character.SPACE_SEPARATOR,
Character.LINE_SEPARATOR,
Character.PARAGRAPH_SEPARATOR -> sb.appendCodePoint(cp); // keeping separators as-is
Character.PARAGRAPH_SEPARATOR -> sb.appendCodePoint(cp); // keep separators as-is
case Character.UPPERCASE_LETTER -> sb.append(maskingChars.get(0));
case Character.LOWERCASE_LETTER -> sb.append(maskingChars.get(1));
case Character.DECIMAL_DIGIT_NUMBER -> sb.append(maskingChars.get(2));
Expand All @@ -51,35 +52,38 @@ private static UnaryOperator<String> createMasker(List<String> maskingChars) {
};
}

private JsonNode maskWithFieldsCheck(JsonNode node) {
private JsonNode maskWithFieldsCheck(JsonNode node, Set<String> maskedFields) {
if (node.isObject()) {
ObjectNode obj = ((ObjectNode) node).objectNode();
node.fields().forEachRemaining(f -> {
String fieldName = f.getKey();
JsonNode fieldVal = f.getValue();
if (fieldShouldBeMasked(fieldName)) {
obj.set(fieldName, maskNodeRecursively(fieldVal));
maskedFields.add(fieldName);
obj.set(fieldName, maskNodeRecursively(fieldVal, maskedFields));
} else {
obj.set(fieldName, maskWithFieldsCheck(fieldVal));
obj.set(fieldName, maskWithFieldsCheck(fieldVal, maskedFields));
}
});
return obj;
} else if (node.isArray()) {
ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(maskWithFieldsCheck(e)));
node.elements().forEachRemaining(e -> arr.add(maskWithFieldsCheck(e, maskedFields)));
return arr;
}
return node;
}

private JsonNode maskNodeRecursively(JsonNode node) {
private JsonNode maskNodeRecursively(JsonNode node, Set<String> maskedFields) {
if (node.isObject()) {
ObjectNode obj = ((ObjectNode) node).objectNode();
node.fields().forEachRemaining(f -> obj.set(f.getKey(), maskNodeRecursively(f.getValue())));
node.fields().forEachRemaining(f ->
obj.set(f.getKey(), maskNodeRecursively(f.getValue(), maskedFields))
);
return obj;
} else if (node.isArray()) {
ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(maskNodeRecursively(e)));
node.elements().forEachRemaining(e -> arr.add(maskNodeRecursively(e, maskedFields)));
return arr;
}
return new TextNode(masker.apply(node.asText()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import com.fasterxml.jackson.databind.node.ContainerNode;
import com.provectus.kafka.ui.config.ClustersProperties;
import java.util.Set;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public abstract class MaskingPolicy {


public static MaskingPolicy create(ClustersProperties.Masking property) {
FieldsSelector fieldsSelector = FieldsSelector.create(property);
return switch (property.getType()) {
Expand All @@ -23,6 +25,7 @@ public static MaskingPolicy create(ClustersProperties.Masking property) {
? Mask.DEFAULT_PATTERN
: property.getMaskingCharsReplacement()
);
case KEEP -> new Keep(fieldsSelector);
};
}

Expand All @@ -34,7 +37,7 @@ protected boolean fieldShouldBeMasked(String fieldName) {
return fieldsSelector.shouldBeMasked(fieldName);
}

public abstract ContainerNode<?> applyToJsonContainer(ContainerNode<?> node);
public abstract ContainerNode<?> applyToJsonContainer(ContainerNode<?> node, Set<String> maskedFields);

public abstract String applyToString(String str);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.Set;

class Remove extends MaskingPolicy {

Expand All @@ -18,24 +18,26 @@ public String applyToString(String str) {
}

@Override
public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node) {
return (ContainerNode<?>) removeFields(node);
public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node, Set<String> maskedFields) {
return (ContainerNode<?>) removeFields(node, maskedFields);
}

private JsonNode removeFields(JsonNode node) {
private JsonNode removeFields(JsonNode node, Set<String> maskedFields) {
if (node.isObject()) {
ObjectNode obj = ((ObjectNode) node).objectNode();
node.fields().forEachRemaining(f -> {
String fieldName = f.getKey();
JsonNode fieldVal = f.getValue();
if (!fieldShouldBeMasked(fieldName)) {
obj.set(fieldName, removeFields(fieldVal));
obj.set(fieldName, removeFields(fieldVal, maskedFields));
} else {
maskedFields.add(fieldName); // ✅ track removed field
}
});
return obj;
} else if (node.isArray()) {
var arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(removeFields(e)));
ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(removeFields(e, maskedFields)));
return arr;
}
return node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Preconditions;
import java.util.Set;

class Replace extends MaskingPolicy {

static final String DEFAULT_REPLACEMENT = "***DATA_MASKED***";

private final String replacement;

Replace(FieldsSelector fieldsSelector, String replacementString) {
static final String DEFAULT_REPLACEMENT = "ANONYMIZED";

Replace(FieldsSelector fieldsSelector, String replacement) {
super(fieldsSelector);
this.replacement = Preconditions.checkNotNull(replacementString);
this.replacement = replacement;
}

@Override
Expand All @@ -24,42 +24,29 @@ public String applyToString(String str) {
}

@Override
public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node) {
return (ContainerNode<?>) replaceWithFieldsCheck(node);
public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node, Set<String> maskedFields) {
return (ContainerNode<?>) replaceFields(node, maskedFields);
}

private JsonNode replaceWithFieldsCheck(JsonNode node) {
private JsonNode replaceFields(JsonNode node, Set<String> maskedFields) {
if (node.isObject()) {
ObjectNode obj = ((ObjectNode) node).objectNode();
node.fields().forEachRemaining(f -> {
String fieldName = f.getKey();
JsonNode fieldVal = f.getValue();
if (fieldShouldBeMasked(fieldName)) {
obj.set(fieldName, replaceRecursive(fieldVal));
maskedFields.add(fieldName); // ✅ track replaced field
obj.set(fieldName, new TextNode(replacement));
} else {
obj.set(fieldName, replaceWithFieldsCheck(fieldVal));
obj.set(fieldName, replaceFields(fieldVal, maskedFields));
}
});
return obj;
} else if (node.isArray()) {
ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(replaceWithFieldsCheck(e)));
node.elements().forEachRemaining(e -> arr.add(replaceFields(e, maskedFields)));
return arr;
}
// if it is not an object or array - we have nothing to replace here
return node;
}

private JsonNode replaceRecursive(JsonNode node) {
if (node.isObject()) {
ObjectNode obj = ((ObjectNode) node).objectNode();
node.fields().forEachRemaining(f -> obj.set(f.getKey(), replaceRecursive(f.getValue())));
return obj;
} else if (node.isArray()) {
ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
node.elements().forEachRemaining(e -> arr.add(replaceRecursive(e)));
return arr;
}
return new TextNode(replacement);
}
}
Loading
Loading