diff --git a/core/src/main/java/org/apache/iceberg/functions/Action.java b/core/src/main/java/org/apache/iceberg/functions/Action.java new file mode 100644 index 000000000000..5f5ed2db195b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/Action.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import java.io.Serializable; +import java.util.Objects; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** + * A column projection action from the ReadRestrictions spec. + * + *

{@link #bind(Type)} returns the masking {@link SerializableFunction} for a given column type; + * all bound functions return null for null input. Per spec all predefined actions preserve the + * input column type, so the bound function maps {@code T -> T}. + * + * @param column value type + */ +public interface Action extends Serializable { + + String MASK_ALPHANUM = "mask-alphanum"; + String MASK_TO_FIXED_VALUE = "mask-to-fixed-value"; + String REPLACE_WITH_NULL = "replace-with-null"; + String SHOW_FIRST_4 = "show-first-4"; + String SHOW_LAST_4 = "show-last-4"; + String TRUNCATE_TO_YEAR = "truncate-to-year"; + String TRUNCATE_TO_MONTH = "truncate-to-month"; + String SHA_256_GLOBAL = "sha-256-global"; + String SHA_256_QUERY_LOCAL = "sha-256-query-local"; + String APPLY_EXPRESSION = "apply-expression"; + + /** The action discriminator string as sent on the wire. */ + String actionType(); + + /** The field id of the column this action applies to. */ + int fieldId(); + + /** + * Returns a function that applies this action to values of the given {@link Type}. + * + * @throws IllegalArgumentException if the type is not supported by this action. + */ + default SerializableFunction bind(Type type) { + throw new UnsupportedOperationException("bind is not implemented for " + getClass().getName()); + } + + /** + * Variant that accepts a per-query salt. Only {@link Sha256QueryLocal} uses the salt; other + * actions ignore it and delegate to {@link #bind(Type)}. + */ + default SerializableFunction bind(Type type, byte[] salt) { + return bind(type); + } + + /** Returns true if this action can be bound to the given {@link Type}. */ + boolean canBind(Type type); + + /** Base for all concrete actions; holds the field id. */ + abstract class BaseAction implements Action { + private final int fieldId; + + BaseAction(int fieldId) { + this.fieldId = fieldId; + } + + @Override + public final int fieldId() { + return fieldId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Action)) { + return false; + } + Action other = (Action) o; + return fieldId == other.fieldId() && actionType().equals(other.actionType()); + } + + @Override + public int hashCode() { + return Objects.hash(actionType(), fieldId); + } + + @Override + public String toString() { + return actionType() + "(" + fieldId + ")"; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/Actions.java b/core/src/main/java/org/apache/iceberg/functions/Actions.java new file mode 100644 index 000000000000..e259fc968a6a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/Actions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.util.SerializableFunction; + +/** Package-private helpers shared by {@link Action} implementations. */ +final class Actions { + + private Actions() {} + + /** + * Base for masking functions where null input must pass through as null unchanged (spec: "For all + * actions, if the input column value is NULL, the output MUST be NULL."). Subclasses implement + * {@link #applyNonNull(Object)} and don't have to repeat the guard. + */ + abstract static class NullSafeFunction implements SerializableFunction { + @Override + public final T apply(S value) { + return value == null ? null : applyNonNull(value); + } + + protected abstract T applyNonNull(S value); + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/ApplyExpression.java b/core/src/main/java/org/apache/iceberg/functions/ApplyExpression.java new file mode 100644 index 000000000000..524f02fb61dc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/ApplyExpression.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import java.util.Objects; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** + * Replaces the column value with the result of a server-provided Expression. Not supported by this + * client yet — Iceberg Expressions are currently boolean-only, so binding returns a function that + * throws on apply. + */ +public final class ApplyExpression extends Action.BaseAction { + private final Expression expression; + + public ApplyExpression(int fieldId, Expression expression) { + super(fieldId); + Preconditions.checkArgument(expression != null, "Invalid expression: null"); + this.expression = expression; + } + + public Expression expression() { + return expression; + } + + @Override + public String actionType() { + return APPLY_EXPRESSION; + } + + @Override + public boolean canBind(Type type) { + return true; + } + + @Override + public SerializableFunction bind(Type type) { + return ApplyExpressionFn.INSTANCE; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ApplyExpression)) { + return false; + } + ApplyExpression other = (ApplyExpression) o; + return fieldId() == other.fieldId() && Objects.equals(expression, other.expression); + } + + @Override + public int hashCode() { + return Objects.hash(actionType(), fieldId(), expression); + } + + @Override + public String toString() { + return actionType() + "(" + fieldId() + ", " + expression + ")"; + } + + private static final class ApplyExpressionFn implements SerializableFunction { + static final ApplyExpressionFn INSTANCE = new ApplyExpressionFn(); + + @Override + public Object apply(Object value) { + throw new UnsupportedOperationException( + "apply-expression column projection is not supported by this client " + + "(Iceberg Expression is currently boolean-only)"); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/MaskAlphanum.java b/core/src/main/java/org/apache/iceberg/functions/MaskAlphanum.java new file mode 100644 index 000000000000..79c6cab94045 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/MaskAlphanum.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** Redacts every Unicode code point in a string per the mask-alphanum rules. */ +public final class MaskAlphanum extends Action.BaseAction { + public MaskAlphanum(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return MASK_ALPHANUM; + } + + @Override + public boolean canBind(Type type) { + return type.typeId() == Type.TypeID.STRING; + } + + @Override + public SerializableFunction bind(Type type) { + Preconditions.checkArgument(canBind(type), "mask-alphanum requires STRING type, got %s", type); + return MaskAlphanumFn.INSTANCE; + } + + /** + * Maps a code point through the mask-alphanum rules; also used by {@link ShowFirst4} and {@link + * ShowLast4} on the code points outside their preserved windows: + * + *
    + *
  • ASCII digits (0-9) -> {@code 'n'} + *
  • Structural punctuation kept as-is: {@code ( ) , . - @} + *
  • Everything else -> {@code 'x'} + *
+ */ + static int maskCodePoint(int cp) { + if (cp >= 0x30 && cp <= 0x39) { + return 'n'; + } else if (cp == '(' || cp == ')' || cp == ',' || cp == '.' || cp == '-' || cp == '@') { + return cp; + } else { + return 'x'; + } + } + + private static final class MaskAlphanumFn extends Actions.NullSafeFunction { + static final MaskAlphanumFn INSTANCE = new MaskAlphanumFn(); + + @Override + protected String applyNonNull(String input) { + StringBuilder sb = new StringBuilder(input.length()); + int offset = 0; + while (offset < input.length()) { + int cp = input.codePointAt(offset); + sb.appendCodePoint(maskCodePoint(cp)); + offset += Character.charCount(cp); + } + return sb.toString(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/MaskToFixedValue.java b/core/src/main/java/org/apache/iceberg/functions/MaskToFixedValue.java new file mode 100644 index 000000000000..77ff3eaf5c20 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/MaskToFixedValue.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.UUID; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SerializableFunction; + +/** Returns a spec-defined fixed value for the column's type. */ +public final class MaskToFixedValue extends Action.BaseAction { + + private static final Integer INT_DEFAULT = 0; + private static final Long LONG_DEFAULT = 0L; + private static final Float FLOAT_DEFAULT = 0.0f; + private static final Double DOUBLE_DEFAULT = 0.0d; + // Per spec: string uses "XXXXXXXX" (not "") so masked strings stay visually distinct from + // legitimately empty strings. All other types use the zero value of their representation. + private static final String STRING_DEFAULT = "XXXXXXXX"; + private static final Integer DATE_DEFAULT = DateTimeUtil.daysFromDate(LocalDate.of(1970, 1, 1)); + private static final Long TIME_DEFAULT_MICROS = 0L; + private static final Long TIMESTAMP_DEFAULT_MICROS = + DateTimeUtil.microsFromTimestamp(LocalDateTime.of(1970, 1, 1, 0, 0)); + private static final Long TIMESTAMP_DEFAULT_NANOS = + DateTimeUtil.nanosFromTimestamp(LocalDateTime.of(1970, 1, 1, 0, 0)); + private static final UUID UUID_DEFAULT = UUID.fromString("00000000-0000-0000-0000-000000000000"); + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer(); + + public MaskToFixedValue(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return MASK_TO_FIXED_VALUE; + } + + @Override + public boolean canBind(Type type) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMP_NANO: + case UUID: + case FIXED: + case BINARY: + case DECIMAL: + return true; + default: + return false; + } + } + + @Override + public SerializableFunction bind(Type type) { + Preconditions.checkArgument( + canBind(type), "mask-to-fixed-value is not supported for type: %s", type); + Object defaultValue = defaultValueFor(type); + return defaultValue instanceof ByteBuffer + ? new ConstantByteBufferFn((ByteBuffer) defaultValue) + : new ConstantFn(defaultValue); + } + + private static Object defaultValueFor(Type type) { + switch (type.typeId()) { + case BOOLEAN: + return Boolean.FALSE; + case INTEGER: + return INT_DEFAULT; + case LONG: + return LONG_DEFAULT; + case FLOAT: + return FLOAT_DEFAULT; + case DOUBLE: + return DOUBLE_DEFAULT; + case STRING: + return STRING_DEFAULT; + case DATE: + return DATE_DEFAULT; + case TIME: + return TIME_DEFAULT_MICROS; + case TIMESTAMP: + return TIMESTAMP_DEFAULT_MICROS; + case TIMESTAMP_NANO: + return TIMESTAMP_DEFAULT_NANOS; + case UUID: + return UUID_DEFAULT; + case FIXED: + return ByteBuffer.allocate(((Types.FixedType) type).length()).asReadOnlyBuffer(); + case BINARY: + return EMPTY_BUFFER; + case DECIMAL: + return new BigDecimal(BigInteger.ZERO, ((Types.DecimalType) type).scale()); + default: + throw new IllegalStateException("unreachable: canBind should have rejected " + type); + } + } + + private static final class ConstantFn extends Actions.NullSafeFunction { + private final Object constant; + + ConstantFn(Object constant) { + this.constant = constant; + } + + @Override + protected Object applyNonNull(Object value) { + return constant; + } + } + + // ByteBuffer has a mutable position; returning a fresh duplicate per call keeps callers + // isolated from each other. + private static final class ConstantByteBufferFn extends Actions.NullSafeFunction { + private final ByteBuffer constant; + + ConstantByteBufferFn(ByteBuffer constant) { + this.constant = constant; + } + + @Override + protected Object applyNonNull(Object value) { + return constant.duplicate(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/ReplaceWithNull.java b/core/src/main/java/org/apache/iceberg/functions/ReplaceWithNull.java new file mode 100644 index 000000000000..54f25d95e587 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/ReplaceWithNull.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** Returns null for every non-null input. Works for any type. */ +public final class ReplaceWithNull extends Action.BaseAction { + public ReplaceWithNull(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return REPLACE_WITH_NULL; + } + + @Override + public boolean canBind(Type type) { + return true; + } + + @Override + public SerializableFunction bind(Type type) { + return ReplaceWithNullFn.INSTANCE; + } + + private static final class ReplaceWithNullFn implements SerializableFunction { + static final ReplaceWithNullFn INSTANCE = new ReplaceWithNullFn(); + + @Override + public Object apply(Object value) { + return null; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/Sha256.java b/core/src/main/java/org/apache/iceberg/functions/Sha256.java new file mode 100644 index 000000000000..6b28b1d8c11b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/Sha256.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding; +import org.apache.iceberg.types.Type; + +/** Shared SHA-256 masking function. One instance per (type, salt) pair. */ +final class Sha256 extends Actions.NullSafeFunction { + + private static final ThreadLocal DIGEST = + ThreadLocal.withInitial( + () -> { + try { + return MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 not available", e); + } + }); + + enum Codec { + STRING { + @Override + void update(MessageDigest md, Object value) { + md.update(((String) value).getBytes(StandardCharsets.UTF_8)); + } + + @Override + Object encode(byte[] digest) { + return BaseEncoding.base16().lowerCase().encode(digest); + } + }, + INTEGER { + @Override + void update(MessageDigest md, Object value) { + int intVal = (Integer) value; + md.update((byte) intVal); + md.update((byte) (intVal >>> 8)); + md.update((byte) (intVal >>> 16)); + md.update((byte) (intVal >>> 24)); + } + + @Override + Object encode(byte[] digest) { + return ByteBuffer.wrap(digest, 0, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(); + } + }, + LONG { + @Override + void update(MessageDigest md, Object value) { + long longVal = (Long) value; + for (int i = 0; i < 8; i++) { + md.update((byte) longVal); + longVal >>>= 8; + } + } + + @Override + Object encode(byte[] digest) { + return ByteBuffer.wrap(digest, 0, 8).order(ByteOrder.LITTLE_ENDIAN).getLong(); + } + }, + BINARY { + @Override + void update(MessageDigest md, Object value) { + md.update(((ByteBuffer) value).duplicate()); + } + + @Override + Object encode(byte[] digest) { + return ByteBuffer.wrap(digest); + } + }; + + abstract void update(MessageDigest md, Object value); + + abstract Object encode(byte[] digest); + } + + static boolean isSupported(Type type) { + switch (type.typeId()) { + case STRING: + case INTEGER: + case LONG: + case BINARY: + return true; + default: + return false; + } + } + + static Sha256 forType(Type type, byte[] salt) { + switch (type.typeId()) { + case STRING: + return new Sha256(Codec.STRING, salt); + case INTEGER: + return new Sha256(Codec.INTEGER, salt); + case LONG: + return new Sha256(Codec.LONG, salt); + case BINARY: + return new Sha256(Codec.BINARY, salt); + default: + throw new IllegalArgumentException("sha-256 is not supported for type: " + type); + } + } + + private final Codec codec; + private final byte[] salt; + + private Sha256(Codec codec, byte[] salt) { + this.codec = codec; + this.salt = salt != null ? salt.clone() : null; + } + + @Override + protected Object applyNonNull(Object value) { + MessageDigest md = DIGEST.get(); + md.reset(); + if (salt != null) { + md.update(salt); + } + codec.update(md, value); + return codec.encode(md.digest()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/Sha256Global.java b/core/src/main/java/org/apache/iceberg/functions/Sha256Global.java new file mode 100644 index 000000000000..9aed161928d6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/Sha256Global.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** Hashes values with SHA-256 using a fixed (unsalted) digest. Output is deterministic. */ +public final class Sha256Global extends Action.BaseAction { + public Sha256Global(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return SHA_256_GLOBAL; + } + + @Override + public boolean canBind(Type type) { + return Sha256.isSupported(type); + } + + @Override + public SerializableFunction bind(Type type) { + Preconditions.checkArgument(canBind(type), "sha-256 is not supported for type: %s", type); + return Sha256.forType(type, null); + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/Sha256QueryLocal.java b/core/src/main/java/org/apache/iceberg/functions/Sha256QueryLocal.java new file mode 100644 index 000000000000..24a1c80eb36e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/Sha256QueryLocal.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** + * Hashes values with SHA-256 using a per-query salt. Binding requires the salt via {@link + * #bind(Type, byte[])}; the no-salt variant fails fast so callers can't accidentally strip the + * query-local randomness. + */ +public final class Sha256QueryLocal extends Action.BaseAction { + public Sha256QueryLocal(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return SHA_256_QUERY_LOCAL; + } + + @Override + public boolean canBind(Type type) { + return Sha256.isSupported(type); + } + + @Override + public SerializableFunction bind(Type type) { + throw new IllegalArgumentException( + "sha-256-query-local requires a salt; call bind(Type, byte[]) instead"); + } + + @Override + public SerializableFunction bind(Type type, byte[] salt) { + Preconditions.checkArgument(canBind(type), "sha-256 is not supported for type: %s", type); + Preconditions.checkArgument( + salt != null && salt.length >= 16, "sha-256-query-local salt must be >= 16 bytes"); + return Sha256.forType(type, salt); + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/ShowFirst4.java b/core/src/main/java/org/apache/iceberg/functions/ShowFirst4.java new file mode 100644 index 000000000000..084a1d6a4a06 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/ShowFirst4.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** Preserves the first 4 code points of a string, redacts the rest via mask-alphanum rules. */ +public final class ShowFirst4 extends Action.BaseAction { + public ShowFirst4(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return SHOW_FIRST_4; + } + + @Override + public boolean canBind(Type type) { + return type.typeId() == Type.TypeID.STRING; + } + + @Override + public SerializableFunction bind(Type type) { + Preconditions.checkArgument(canBind(type), "show-first-4 requires STRING type, got %s", type); + return ShowFirst4Fn.INSTANCE; + } + + private static final class ShowFirst4Fn extends Actions.NullSafeFunction { + static final ShowFirst4Fn INSTANCE = new ShowFirst4Fn(); + + @Override + protected String applyNonNull(String input) { + if (input.codePointCount(0, input.length()) <= 4) { + return input; + } + StringBuilder sb = new StringBuilder(input.length()); + int cpIndex = 0; + int offset = 0; + while (offset < input.length()) { + int cp = input.codePointAt(offset); + sb.appendCodePoint(cpIndex < 4 ? cp : MaskAlphanum.maskCodePoint(cp)); + offset += Character.charCount(cp); + cpIndex++; + } + return sb.toString(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/ShowLast4.java b/core/src/main/java/org/apache/iceberg/functions/ShowLast4.java new file mode 100644 index 000000000000..698960a54ae0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/ShowLast4.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** Redacts all but the last 4 code points of a string via mask-alphanum rules. */ +public final class ShowLast4 extends Action.BaseAction { + public ShowLast4(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return SHOW_LAST_4; + } + + @Override + public boolean canBind(Type type) { + return type.typeId() == Type.TypeID.STRING; + } + + @Override + public SerializableFunction bind(Type type) { + Preconditions.checkArgument(canBind(type), "show-last-4 requires STRING type, got %s", type); + return ShowLast4Fn.INSTANCE; + } + + private static final class ShowLast4Fn extends Actions.NullSafeFunction { + static final ShowLast4Fn INSTANCE = new ShowLast4Fn(); + + @Override + protected String applyNonNull(String input) { + // Single pass: walk the string while remembering the last 4 code-point start offsets. + // When done, everything before the oldest remembered offset is masked; everything from + // that offset onward is kept verbatim. + int[] lastFourStarts = new int[4]; + int count = 0; + int offset = 0; + while (offset < input.length()) { + lastFourStarts[count % 4] = offset; + int cp = input.codePointAt(offset); + offset += Character.charCount(cp); + count++; + } + if (count <= 4) { + return input; + } + int keepFromOffset = lastFourStarts[count % 4]; + StringBuilder sb = new StringBuilder(input.length()); + int maskOffset = 0; + while (maskOffset < keepFromOffset) { + int cp = input.codePointAt(maskOffset); + sb.appendCodePoint(MaskAlphanum.maskCodePoint(cp)); + maskOffset += Character.charCount(cp); + } + sb.append(input, keepFromOffset, input.length()); + return sb.toString(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/TruncateTemporal.java b/core/src/main/java/org/apache/iceberg/functions/TruncateTemporal.java new file mode 100644 index 000000000000..9cc0240d485b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/TruncateTemporal.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.DateTimeUtil; + +/** Truncates date/timestamp values to the first instant of their year or month. */ +final class TruncateTemporal extends Actions.NullSafeFunction { + + enum Unit { + YEAR, + MONTH + } + + private enum Storage { + DATE_DAYS, + TIMESTAMP_MICROS, + TIMESTAMP_NANOS + } + + static final TruncateTemporal DATE_YEAR = new TruncateTemporal(Unit.YEAR, Storage.DATE_DAYS); + static final TruncateTemporal DATE_MONTH = new TruncateTemporal(Unit.MONTH, Storage.DATE_DAYS); + static final TruncateTemporal TIMESTAMP_YEAR = + new TruncateTemporal(Unit.YEAR, Storage.TIMESTAMP_MICROS); + static final TruncateTemporal TIMESTAMP_MONTH = + new TruncateTemporal(Unit.MONTH, Storage.TIMESTAMP_MICROS); + static final TruncateTemporal TIMESTAMP_NANO_YEAR = + new TruncateTemporal(Unit.YEAR, Storage.TIMESTAMP_NANOS); + static final TruncateTemporal TIMESTAMP_NANO_MONTH = + new TruncateTemporal(Unit.MONTH, Storage.TIMESTAMP_NANOS); + + static TruncateTemporal forType(Unit unit, Type type) { + switch (type.typeId()) { + case DATE: + return unit == Unit.YEAR ? DATE_YEAR : DATE_MONTH; + case TIMESTAMP: + return unit == Unit.YEAR ? TIMESTAMP_YEAR : TIMESTAMP_MONTH; + case TIMESTAMP_NANO: + return unit == Unit.YEAR ? TIMESTAMP_NANO_YEAR : TIMESTAMP_NANO_MONTH; + default: + throw new IllegalArgumentException("Unsupported type for truncate: " + type); + } + } + + private final Unit unit; + private final Storage storage; + + private TruncateTemporal(Unit unit, Storage storage) { + this.unit = unit; + this.storage = storage; + } + + @Override + protected Object applyNonNull(Object value) { + switch (storage) { + case DATE_DAYS: + LocalDate date = DateTimeUtil.dateFromDays((Integer) value); + return DateTimeUtil.daysFromDate(truncateDate(date)); + case TIMESTAMP_MICROS: + LocalDateTime tsMicros = DateTimeUtil.timestampFromMicros((Long) value); + return DateTimeUtil.microsFromTimestamp(truncateTimestamp(tsMicros)); + case TIMESTAMP_NANOS: + LocalDateTime tsNanos = DateTimeUtil.timestampFromNanos((Long) value); + return DateTimeUtil.nanosFromTimestamp(truncateTimestamp(tsNanos)); + default: + throw new IllegalStateException("unreachable: " + storage); + } + } + + private LocalDate truncateDate(LocalDate date) { + return unit == Unit.YEAR ? date.withMonth(1).withDayOfMonth(1) : date.withDayOfMonth(1); + } + + private LocalDateTime truncateTimestamp(LocalDateTime ts) { + LocalDateTime truncated = + unit == Unit.YEAR ? ts.withMonth(1).withDayOfMonth(1) : ts.withDayOfMonth(1); + return truncated.with(LocalTime.MIDNIGHT); + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/TruncateToMonth.java b/core/src/main/java/org/apache/iceberg/functions/TruncateToMonth.java new file mode 100644 index 000000000000..77a01e711790 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/TruncateToMonth.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** Truncates date or timestamp values to the first instant of their month. */ +public final class TruncateToMonth extends Action.BaseAction { + public TruncateToMonth(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return TRUNCATE_TO_MONTH; + } + + @Override + public boolean canBind(Type type) { + switch (type.typeId()) { + case DATE: + case TIMESTAMP: + case TIMESTAMP_NANO: + return true; + default: + return false; + } + } + + @Override + public SerializableFunction bind(Type type) { + Preconditions.checkArgument( + canBind(type), "truncate-to-month is not supported for type: %s", type); + return TruncateTemporal.forType(TruncateTemporal.Unit.MONTH, type); + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/TruncateToYear.java b/core/src/main/java/org/apache/iceberg/functions/TruncateToYear.java new file mode 100644 index 000000000000..a912f0cb8a80 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/TruncateToYear.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** Truncates date or timestamp values to the first instant of their year. */ +public final class TruncateToYear extends Action.BaseAction { + public TruncateToYear(int fieldId) { + super(fieldId); + } + + @Override + public String actionType() { + return TRUNCATE_TO_YEAR; + } + + @Override + public boolean canBind(Type type) { + switch (type.typeId()) { + case DATE: + case TIMESTAMP: + case TIMESTAMP_NANO: + return true; + default: + return false; + } + } + + @Override + public SerializableFunction bind(Type type) { + Preconditions.checkArgument( + canBind(type), "truncate-to-year is not supported for type: %s", type); + return TruncateTemporal.forType(TruncateTemporal.Unit.YEAR, type); + } +} diff --git a/core/src/main/java/org/apache/iceberg/functions/UnknownAction.java b/core/src/main/java/org/apache/iceberg/functions/UnknownAction.java new file mode 100644 index 000000000000..a72c1c76ddf5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/functions/UnknownAction.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.SerializableFunction; + +/** + * Preserves an action with a discriminator string this client doesn't recognize so that newer + * server-side action types don't break parsing. Callers that intend to enforce the action + * (engine-side rules) must fail closed when they encounter this — silent skipping would leak + * unmasked data. + */ +public final class UnknownAction extends Action.BaseAction { + private final String actionType; + + public UnknownAction(int fieldId, String actionType) { + super(fieldId); + Preconditions.checkArgument(actionType != null, "Invalid action type: null"); + this.actionType = actionType; + } + + @Override + public String actionType() { + return actionType; + } + + @Override + public boolean canBind(Type type) { + return false; + } + + @Override + public SerializableFunction bind(Type type) { + throw new IllegalArgumentException( + "Cannot bind unknown action type '" + + actionType + + "': this client does not recognize the action. Upgrade the client or remove the " + + "action from the server-side policy."); + } +} diff --git a/core/src/test/java/org/apache/iceberg/functions/TestActions.java b/core/src/test/java/org/apache/iceberg/functions/TestActions.java new file mode 100644 index 000000000000..01e274c81401 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/functions/TestActions.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.functions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SerializableFunction; +import org.junit.jupiter.api.Test; + +public class TestActions { + + @Test + public void maskAlphanumSpecExample() { + SerializableFunction fn = new MaskAlphanum(1).bind(Types.StringType.get()); + assertThat(fn.apply("prashant010696@gmail.com")).isEqualTo("xxxxxxxxnnnnnn@xxxxx.xxx"); + } + + @Test + public void maskAlphanumPreservedPunctuation() { + SerializableFunction fn = new MaskAlphanum(1).bind(Types.StringType.get()); + assertThat(fn.apply("(555) 123-4567")).isEqualTo("(nnn)xnnn-nnnn"); + assertThat(fn.apply("a.b,c")).isEqualTo("x.x,x"); + } + + @Test + public void maskAlphanumNullInNullOut() { + SerializableFunction fn = new MaskAlphanum(1).bind(Types.StringType.get()); + assertThat(fn.apply(null)).isNull(); + } + + @Test + public void maskAlphanumEmptyString() { + SerializableFunction fn = new MaskAlphanum(1).bind(Types.StringType.get()); + assertThat(fn.apply("")).isEqualTo(""); + } + + @Test + public void showFirst4SpecExample() { + SerializableFunction fn = new ShowFirst4(1).bind(Types.StringType.get()); + assertThat(fn.apply("prashant010696@gmail.com")).isEqualTo("prasxxxxnnnnnn@xxxxx.xxx"); + } + + @Test + public void showFirst4FourOrFewerReturnedUnchanged() { + SerializableFunction fn = new ShowFirst4(1).bind(Types.StringType.get()); + assertThat(fn.apply("abcd")).isEqualTo("abcd"); + assertThat(fn.apply("ab")).isEqualTo("ab"); + assertThat(fn.apply("")).isEqualTo(""); + } + + @Test + public void showLast4SpecExample() { + SerializableFunction fn = new ShowLast4(1).bind(Types.StringType.get()); + assertThat(fn.apply("4111-1111-1111-4444")).isEqualTo("nnnn-nnnn-nnnn-4444"); + } + + @Test + public void showLast4FourOrFewerReturnedUnchanged() { + SerializableFunction fn = new ShowLast4(1).bind(Types.StringType.get()); + assertThat(fn.apply("abcd")).isEqualTo("abcd"); + assertThat(fn.apply("ab")).isEqualTo("ab"); + } + + @Test + public void replaceWithNullAlwaysReturnsNull() { + SerializableFunction fn = new ReplaceWithNull(1).bind(Types.IntegerType.get()); + assertThat(fn.apply(42)).isNull(); + assertThat(fn.apply(null)).isNull(); + + SerializableFunction strFn = + new ReplaceWithNull(1).bind(Types.StringType.get()); + assertThat(strFn.apply("hello")).isNull(); + } + + @Test + public void maskToFixedValueString() { + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.StringType.get()); + assertThat(fn.apply("anything")).isEqualTo("XXXXXXXX"); + } + + @Test + public void maskToFixedValueInt() { + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.IntegerType.get()); + assertThat(fn.apply(42)).isEqualTo(0); + } + + @Test + public void maskToFixedValueLong() { + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.LongType.get()); + assertThat(fn.apply(42L)).isEqualTo(0L); + } + + @Test + public void maskToFixedValueDouble() { + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.DoubleType.get()); + assertThat(fn.apply(3.14)).isEqualTo(0.0d); + } + + @Test + public void maskToFixedValueBoolean() { + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.BooleanType.get()); + assertThat(fn.apply(true)).isEqualTo(false); + } + + @Test + public void maskToFixedValueDate() { + int input = DateTimeUtil.daysFromDate(LocalDate.of(2024, 7, 15)); + int expected = DateTimeUtil.daysFromDate(LocalDate.of(1970, 1, 1)); + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.DateType.get()); + assertThat(fn.apply(input)).isEqualTo(expected); + } + + @Test + public void maskToFixedValueTimestamp() { + long input = + LocalDateTime.of(2024, 7, 15, 13, 45, 30).toEpochSecond(ZoneOffset.UTC) * 1_000_000L; + long expected = LocalDateTime.of(1970, 1, 1, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1_000_000L; + SerializableFunction fn = + new MaskToFixedValue(1).bind(Types.TimestampType.withZone()); + assertThat(fn.apply(input)).isEqualTo(expected); + } + + @Test + public void maskToFixedValueBinary() { + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.BinaryType.get()); + ByteBuffer result = (ByteBuffer) fn.apply(ByteBuffer.wrap(new byte[] {1, 2, 3})); + assertThat(result.remaining()).isEqualTo(0); + } + + @Test + public void maskToFixedValueDecimal() { + SerializableFunction fn = + new MaskToFixedValue(1).bind(Types.DecimalType.of(10, 2)); + BigDecimal result = (BigDecimal) fn.apply(new BigDecimal("12.34")); + assertThat(result.compareTo(BigDecimal.ZERO)).isEqualTo(0); + assertThat(result.scale()).isEqualTo(2); + } + + @Test + public void maskToFixedValueNullReturnsNull() { + SerializableFunction fn = new MaskToFixedValue(1).bind(Types.IntegerType.get()); + assertThat(fn.apply(null)).isNull(); + } + + @Test + public void truncateToYearDate() { + int input = (int) LocalDate.of(2024, 7, 15).toEpochDay(); + int expected = (int) LocalDate.of(2024, 1, 1).toEpochDay(); + SerializableFunction fn = new TruncateToYear(1).bind(Types.DateType.get()); + assertThat(fn.apply(input)).isEqualTo(expected); + } + + @Test + public void truncateToMonthDate() { + int input = (int) LocalDate.of(2024, 7, 15).toEpochDay(); + int expected = (int) LocalDate.of(2024, 7, 1).toEpochDay(); + SerializableFunction fn = new TruncateToMonth(1).bind(Types.DateType.get()); + assertThat(fn.apply(input)).isEqualTo(expected); + } + + @Test + public void truncateToYearTimestamp() { + long inputMicros = + LocalDateTime.of(2024, 7, 15, 13, 45, 30).toEpochSecond(ZoneOffset.UTC) * 1_000_000L; + long expectedMicros = + LocalDateTime.of(2024, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1_000_000L; + SerializableFunction fn = + new TruncateToYear(1).bind(Types.TimestampType.withZone()); + assertThat(fn.apply(inputMicros)).isEqualTo(expectedMicros); + } + + @Test + public void truncateToMonthTimestamp() { + long inputMicros = + LocalDateTime.of(2024, 7, 15, 13, 45, 30).toEpochSecond(ZoneOffset.UTC) * 1_000_000L; + long expectedMicros = + LocalDateTime.of(2024, 7, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1_000_000L; + SerializableFunction fn = + new TruncateToMonth(1).bind(Types.TimestampType.withZone()); + assertThat(fn.apply(inputMicros)).isEqualTo(expectedMicros); + } + + @Test + public void sha256GlobalStringIsDeterministic() { + SerializableFunction fn = new Sha256Global(1).bind(Types.StringType.get()); + String first = (String) fn.apply("hello"); + String second = (String) fn.apply("hello"); + assertThat(first).isEqualTo(second); + assertThat(first).isEqualTo("2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"); + } + + @Test + public void sha256GlobalBinaryReturns32Bytes() { + SerializableFunction fn = new Sha256Global(1).bind(Types.BinaryType.get()); + ByteBuffer result = (ByteBuffer) fn.apply(ByteBuffer.wrap(new byte[] {1, 2, 3})); + assertThat(result.remaining()).isEqualTo(32); + } + + @Test + public void sha256GlobalIntegerDeterministic() { + SerializableFunction fn = new Sha256Global(1).bind(Types.IntegerType.get()); + Object first = fn.apply(42); + Object second = fn.apply(42); + assertThat(first).isEqualTo(second); + assertThat(first).isInstanceOf(Integer.class); + } + + @Test + public void sha256GlobalLongDeterministic() { + SerializableFunction fn = new Sha256Global(1).bind(Types.LongType.get()); + Object first = fn.apply(42L); + Object second = fn.apply(42L); + assertThat(first).isEqualTo(second); + assertThat(first).isInstanceOf(Long.class); + } + + @Test + public void sha256QueryLocalDiffersWithDifferentSalt() { + byte[] saltA = new byte[16]; + byte[] saltB = new byte[16]; + Arrays.fill(saltA, (byte) 1); + Arrays.fill(saltB, (byte) 2); + SerializableFunction fnA = + new Sha256QueryLocal(1).bind(Types.StringType.get(), saltA); + SerializableFunction fnB = + new Sha256QueryLocal(1).bind(Types.StringType.get(), saltB); + assertThat(fnA.apply("hello")).isNotEqualTo(fnB.apply("hello")); + } + + @Test + public void sha256QueryLocalSaltMustBeAtLeast16Bytes() { + assertThatThrownBy(() -> new Sha256QueryLocal(1).bind(Types.StringType.get(), new byte[15])) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("16 bytes"); + } + + @Test + public void applyExpressionFailsOnApply() { + SerializableFunction fn = + new ApplyExpression(1, Expressions.alwaysTrue()).bind(Types.StringType.get()); + assertThatThrownBy(() -> fn.apply("any")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("apply-expression"); + } + + @Test + public void bindRejectsMaskAlphanumOnNonString() { + assertThatThrownBy(() -> new MaskAlphanum(1).bind(Types.IntegerType.get())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("STRING"); + } + + @Test + public void bindRejectsTruncateOnUnsupportedType() { + assertThatThrownBy(() -> new TruncateToYear(1).bind(Types.StringType.get())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("not supported for type"); + } + + @Test + public void bindFailsClosedOnUnknownAction() { + assertThatThrownBy(() -> new UnknownAction(1, "future-mask-v2").bind(Types.StringType.get())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("future-mask-v2"); + } + + @Test + public void sha256NullInNullOut() { + SerializableFunction fn = new Sha256Global(1).bind(Types.StringType.get()); + assertThat(fn.apply(null)).isNull(); + } + + @Test + public void truncateNullInNullOut() { + SerializableFunction fn = new TruncateToYear(1).bind(Types.DateType.get()); + assertThat(fn.apply(null)).isNull(); + } +}