Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ storm.nimbus.zookeeper.acls.fixup: true

storm.auth.simple-white-list.users: []
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate"
storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdThriftSerializationDelegate"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching the default to the pure Zstd delegate breaks rolling upgrades: every existing cluster has Gzip-compressed Thrift sitting in ZooKeeper, and ZstdThriftSerializationDelegate cannot read it. The new bridge class is presumably designed for exactly this case — once its fallback is fixed (see comment on that file), please point the default here at ZstdBridgeThriftSerializationDelegate instead. Operators can opt into the pure Zstd delegate once they are sure no legacy Gzip state remains in ZK.

storm.compression.zstd.level: 3
storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
storm.workers.artifacts.dir: "workers-artifacts"
storm.health.check.dir: "healthchecks"
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@

<!-- dependency versions -->
<commons-compress.version>1.28.0</commons-compress.version>
<zstd-jni.version>1.5.7-8</zstd-jni.version>
<commons-io.version>2.22.0</commons-io.version>
<commons-lang3.version>3.20.0</commons-lang3.version>
<commons-exec.version>1.6.0</commons-exec.version>
Expand Down Expand Up @@ -514,6 +515,12 @@
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
<scope>compile</scope>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compile is the default Maven scope — this line is redundant and can be dropped.

Also worth checking whether the explicit zstd-jni dependency is needed at all: commons-compress already pulls it transitively, and the new code only uses org.apache.commons.compress.compressors.zstandard.*. If nothing in the codebase imports com.github.luben.zstd.* directly, this whole <dependency> entry plus the zstd-jni.version property at the top of the file can go.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the redundant compile tags as suggested.

Regarding the zstd-jni dependency: I have confirmed that it is marked as <optional>true</optional> in the commons-compress 1.28.0 POM. I must keep this explicit dependency to ensure the artifact is included in the classpath.

</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
Expand Down
9 changes: 9 additions & 0 deletions storm-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<scope>compile</scope>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the root pom — compile is the default scope, drop the <scope> line. And if no module-level code imports com.github.luben.zstd.* directly, the whole <dependency> entry is redundant with the commons-compress one right above it.

</dependency>
</dependencies>

<build>
Expand Down
6 changes: 5 additions & 1 deletion storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,11 @@ public class Config extends HashMap<String, Object> {
*/
@IsString
public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";

/**
* Zstandard compression level. Defaults to 3.
*/
@CustomValidator(validatorClass = ConfigValidation.ZstdLevelValidator.class)
public static final String STORM_COMPRESSION_ZSTD_LEVEL = "storm.compression.zstd.level";
/**
* Configure the topology metrics reporters to be used on workers.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: rest of this package uses /* for the ASF license header; this file uses /** (Javadoc style).

* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.serialization;

import java.util.Map;
import java.util.zip.GZIPInputStream;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import. Its presence suggests the original intent was to fall back to Gzip rather than raw Thrift — see comment on the defaultDelegate field below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, I have done copy/paste from the gzip bridge delegate.
Removed.


/**
* Always writes Zstd out, but tests incoming bytes to determine the format.
* If Zstd magic is found, it uses {@link ZstdThriftSerializationDelegate}.
* If not, it falls back to {@link ThriftSerializationDelegate} for raw Thrift.
*/
public class ZstdBridgeThriftSerializationDelegate implements SerializationDelegate {

/**
* Zstandard magic number 0xFD2FB52. In a byte array (little-endian format): [0x28, 0xB5, 0x2F, 0xFD]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Zstandard magic number is 0xFD2FB528 (32 bits / 8 hex digits), not 0xFD2FB52. The bytes themselves below are correct; just the comment is missing a digit.

*/
private static final byte[] ZSTD_MAGIC = new byte[]{ 0x28, (byte) 0xB5, 0x2F, (byte) 0xFD };

private final ThriftSerializationDelegate defaultDelegate = new ThriftSerializationDelegate();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be GzipThriftSerializationDelegate, not ThriftSerializationDelegate. The shipping default for storm.meta.serialization.delegate has always been Gzip-compressed Thrift, so on a non-Zstd magic header the bytes will be Gzip — never raw Thrift. As written, this bridge cannot read any state produced by previous releases, defeating the purpose of having a bridge.

With GzipThriftSerializationDelegate as the fallback here, and the bridge promoted to the new default in defaults.yaml, rolling upgrades work transparently in both directions.

private final ZstdThriftSerializationDelegate zstdDelegate = new ZstdThriftSerializationDelegate();

@Override
public void prepare(Map<String, Object> topoConf) {
defaultDelegate.prepare(topoConf);
zstdDelegate.prepare(topoConf);
}

@Override
public byte[] serialize(Object object) {
// Always compress new data with Zstd
return zstdDelegate.serialize(object);
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (isZstd(bytes)) {
return zstdDelegate.deserialize(bytes, clazz);
} else {
// Fallback for old data or non-compressed data
return defaultDelegate.deserialize(bytes, clazz);
}
}

/**
* Checks the first 4 bytes of the array against the Zstd Magic Number.
*/
private boolean isZstd(byte[] bytes) {
if (bytes == null || bytes.length < 4) {
return false;
}

return bytes[0] == ZSTD_MAGIC[0] && bytes[1] == ZSTD_MAGIC[1] && bytes[2] == ZSTD_MAGIC[2] && bytes[3] == ZSTD_MAGIC[3];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.serialization;

import java.util.Map;
import org.apache.storm.thrift.TBase;
import org.apache.storm.thrift.TDeserializer;
import org.apache.storm.thrift.TException;
import org.apache.storm.thrift.TSerializer;
import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.utils.Utils;

/**
* Note, this assumes it's deserializing a gzip byte stream, and will err if it encounters any other serialization.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale copy from GzipThriftSerializationDelegate: this delegate handles Zstd, not "a gzip byte stream".

*/
public class ZstdThriftSerializationDelegate implements SerializationDelegate {

// ThreadLocal with explicit exception handling for checked TTransportException
private static final ThreadLocal<TSerializer> SERIALIZER = ThreadLocal.withInitial(() -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caching TSerializer/TDeserializer per thread is a behaviour change worth reconsidering — every other delegate in this package allocates them per call. Thrift's TMemoryBuffer inside TSerializer grows to the largest message ever serialized on that thread and never shrinks, so on long-lived executor threads, one occasional large metadata blob permanently pins that memory across every worker. Standard ThreadLocal classloader-pinning concerns apply on worker shutdown too.

Suggest following the existing per-call pattern; TSerializer is cheap to allocate compared to compression. If the cache is kept, at minimum add a lifecycle hook that calls SERIALIZER.remove() / DESERIALIZER.remove() on shutdown.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the detailed explanation. I didn't account for the TMemoryBuffer growth behavior over long-lived threads. Given that TSerializer allocation is cheap I will revert to the per-call allocation pattern.

try {
return new TSerializer();
} catch (TTransportException e) {
throw new RuntimeException("Failed to initialize Thrift Serializer", e);
}
});

private static final ThreadLocal<TDeserializer> DESERIALIZER = ThreadLocal.withInitial(() -> {
try {
return new TDeserializer();
} catch (TTransportException e) {
throw new RuntimeException("Failed to initialize Thrift Deserializer", e);
}
});

@Override
public void prepare(Map<String, Object> topoConf) {
// No-op: Initialization happens lazily per thread
}

@Override
public byte[] serialize(Object object) {
if (!(object instanceof TBase)) {
throw new IllegalArgumentException("Object must be an instance of TBase");
}
try {
byte[] thriftData = SERIALIZER.get().serialize((TBase<?, ?>) object);
return Utils.ZstdUtils.compress(thriftData);
} catch (TException e) {
throw new RuntimeException("Failed to serialize Thrift object", e);
}
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try {
byte[] decompressed = Utils.ZstdUtils.decompress(bytes);
TBase<?, ?> instance = (TBase<?, ?>) clazz.getDeclaredConstructor().newInstance();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit / defense-in-depth: clazz.asSubclass(TBase.class).getDeclaredConstructor().newInstance() fails fast on a wrong class type rather than after construction. Not a security issue under Storm's threat model — just a small robustness improvement.

DESERIALIZER.get().deserialize(instance, decompressed);
return (T) instance;
} catch (Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch (Exception) here also swallows InterruptedException. Suggest narrowing to the checked exceptions actually thrown (ReflectiveOperationException, TException) so an interrupt isn't silently converted into a RuntimeException with the interrupt flag dropped.

throw new RuntimeException("Failed to deserialize bytes to " + clazz.getName(), e);
}
}

}
6 changes: 6 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ConfigUtils {
public static final String FILE_SEPARATOR = File.separator;
public static final String STORM_HOME = "storm.home";
public static final String RESOURCES_SUBDIR = "resources";
public static final int DEFAULT_ZSTD_COMPRESSION_LEVEL = 3;

private static final Set<String> passwordConfigKeys = new HashSet<>();

Expand Down Expand Up @@ -175,6 +176,11 @@ public static int samplingRate(Map<String, Object> conf) {
throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
}

public static int zstdCompressionLevel(Map<String, Object> conf) {
return ObjectReader.getInt(conf.getOrDefault(Config.STORM_COMPRESSION_ZSTD_LEVEL,
DEFAULT_ZSTD_COMPRESSION_LEVEL));
}

public static BooleanSupplier mkStatsSampler(Map<String, Object> conf) {
return evenSampler(samplingRate(conf));
}
Expand Down
72 changes: 72 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import javax.security.auth.Subject;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.ClientBlobStore;
Expand Down Expand Up @@ -960,6 +963,75 @@ public static byte[] gunzip(byte[] data) {
}
}

/**
* Static utility class for Zstandard (Zstd) compression and decompression.
*/
public static final class ZstdUtils {

private static final int BUFFER_SIZE = 64 * 1024;

/**
* Private constructor to prevent instantiation.
* @throws UnsupportedOperationException if an attempt is made to instantiate this class.
*/
private ZstdUtils() {
throw new UnsupportedOperationException("Utility class should not be instantiated.");
}

/**
* Compresses the provided byte array using Zstandard.
*
* <p>The output includes the standard Zstandard frame header, making it
* self-describing for the decompression phase.</p>
*
* @param data the raw byte array to compress.
* @return a compressed byte array, or the original array if null/empty.
* @throws RuntimeException wrapping an {@link IOException} if the compression fails.
*/
public static byte[] compress(byte[] data) {
if (data == null || data.length == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the raw input for empty data means the bridge's isZstd() check sees no magic header and routes through the fallback path. Works by accident for empty input today, but it's an asymmetry that will bite later. Either drop the early-return so output always carries a valid Zstd frame, or document the contract explicitly.

Copy link
Copy Markdown
Contributor Author

@GGraziadei GGraziadei May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I am proposing an alternative to this method (now moved to the utility class).

I found in the documentation a safe method to do the same comparison better: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/invoke/MethodHandles.html#byteArrayViewVarHandle(java.lang.Class,java.nio.ByteOrder)

private static final int ZSTD_MAGIC_INT = 0xFD2FB528;

private static final VarHandle INT_HANDLE = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN);
...
public static boolean isZstd(byte[] bytes) {
    if (bytes == null) {
        return false;
    }
    return (int) INT_HANDLE.get(bytes, 0) == ZSTD_MAGIC_INT;
}

return data;
}

try (ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length)) {
try (ZstdCompressorOutputStream zstdOut = ZstdCompressorOutputStream.builder()
.setOutputStream(bos)
.setBufferSize(BUFFER_SIZE) // impacts on compression ratio
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reaching into the static Utils.localConf from a public utility class makes ZstdUtils.compress impossible to test in isolation and effectively pins the compression level at JVM startup. The level is per-delegate config — SerializationDelegate#prepare(topoConf) is the natural place to read it once and cache it. Suggest plumbing the level through as a parameter, e.g. compress(byte[] data, int level), and having ZstdThriftSerializationDelegate.prepare stash the value after looking it up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I am fixing.

.setLevel(ConfigUtils.zstdCompressionLevel(localConf))
.get()) {
zstdOut.write(data);
zstdOut.finish();
}
return bos.toByteArray();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Narrow this to IOException — the underlying streams only declare that, and catch (Exception) swallows InterruptedException if the calling thread is interrupted mid-write.

} catch (Exception e) {
throw new RuntimeException("Zstd compression failed", e);
}
}

/**
* Decompresses a Zstandard-compressed byte array.
*
* @param data the compressed byte array (Zstd frame).
* @return the original decompressed byte array, or the input if null/empty.
* @throws RuntimeException wrapping an {@link IOException} if the decompression fails
* or if the data is not a valid Zstd frame.
*/
public static byte[] decompress(byte[] data) {
if (data == null || data.length == 0) {
return data;
}

try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
ZstdCompressorInputStream zstdIn = new ZstdCompressorInputStream(bis);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardening suggestion — not a vulnerability under Storm's threat model since ZK sits inside the trusted boundary, but worth doing as defense-in-depth: this IOUtils.copy will happily decompress an unbounded amount of data into memory. Zstd routinely achieves >100× ratios and pathological frames can reach >1,000,000×, so a small crafted frame can OOM the JVM. Suggest a configurable cap (e.g. storm.compression.zstd.max.decompressed.bytes, default in the tens of MiB — well above any realistic metadata size) enforced via BoundedInputStream or a manual copy loop. The same caveat applies to Utils.gunzip in principle, but Zstd raises the ceiling significantly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I am fixing.

IOUtils.copy(zstdIn, bos);
return bos.toByteArray();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the compress path: narrow to IOException, otherwise InterruptedException is swallowed.

} catch (Exception e) {
throw new RuntimeException("Zstd decompression failed. Make sure the data is a valid Zstd frame.", e);
}
}
}

public static List<String> getRepeat(List<String> list) {
List<String> rtn = new ArrayList<String>();
Set<String> idSet = new HashSet<String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,26 @@ public void validateField(String name, Object o) {
}
}

public static class ZstdLevelValidator extends Validator {
private static final int MIN_LEVEL = 1;
private static final int MAX_LEVEL = 22;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Levels 20–22 are Zstd "ultra" mode and require dramatically more working memory per call — rarely worth it for metadata-sized payloads, and they make the cluster easy to footgun with a single storm.yaml typo. Suggest capping at 19, or gating ultra levels behind an explicit opt-in (e.g. storm.compression.zstd.allow.ultra).


@Override
public void validateField(String name, Object o) {
if (o == null) {
return;
}
SimpleTypeValidator.validateField(name, Integer.class, o);
int level = (Integer) o;
if (level < MIN_LEVEL || level > MAX_LEVEL) {
throw new IllegalArgumentException(
String.format("Field '%s' is invalid: %d. Zstd compression level must be between %d and %d.",
name, level, MIN_LEVEL, MAX_LEVEL)
);
}
}
}

public static class EventLoggerRegistryValidator extends Validator {

@Override
Expand Down
Loading