Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.utils;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

Expand Down Expand Up @@ -56,12 +59,25 @@ public interface AddCallback {

private Runnable underCallback;

private final CopyOnWriteArraySet<SizeAwareMetric> hierarchy = new CopyOnWriteArraySet<>();

private Object owner;

/**
* To be used in a case where we just measure elements
*/
public SizeAwareMetric() {
}

public Object getOwner() {
return owner;
}

public SizeAwareMetric setOwner(Object owner) {
this.owner = owner;
return this;
}


public SizeAwareMetric(long maxSize, long lowerMarkSize, long maxElements, long lowerMarkElements) {
if (lowerMarkSize > maxSize) {
Expand Down Expand Up @@ -104,6 +120,21 @@ public boolean isSizeEnabled() {
return maxSize >= 0;
}

public void addHierarchy(SizeAwareMetric hierarchy) {
if (hierarchy == this) {
throw new IllegalArgumentException("recursive hierarchy");
}
this.hierarchy.add(hierarchy);
}

public void removeHierarchy(SizeAwareMetric hierarchy) {
this.hierarchy.remove(hierarchy);
}

public Set<SizeAwareMetric> getHierarchy() {
return Collections.unmodifiableSet(hierarchy);
}

public SizeAwareMetric setOnSizeCallback(AddCallback onSize) {
this.onSizeCallback = onSize;
return this;
Expand Down Expand Up @@ -161,12 +192,17 @@ public final long addSize(final int delta, final boolean sizeOnly, boolean affec

changeFlag(NOT_USED, FREE);

if (onSizeCallback != null && affectCallbacks) {
try {
onSizeCallback.add(delta, sizeOnly);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
if (affectCallbacks) {
if (onSizeCallback != null) {
try {
onSizeCallback.add(delta, sizeOnly);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}

// on Hierarchy calls, we don't affect callbacks (which would affect global-size and other callbacks)
hierarchy.forEach(f -> f.addSize(delta, sizeOnly, false));
}

long currentSize = sizeUpdater.addAndGet(this, delta);
Expand Down Expand Up @@ -270,6 +306,14 @@ private void checkOver(long currentElements, long currentSize) {

@Override
public String toString() {
return "SizeAwareMetric{" + "elements=" + elements + ", size=" + size + '}';
return "SizeAwareMetric{" + "elements=" + elements + ", size=" + size + ", owner=" + String.valueOf(owner) + '}';
}

public String debugHierarchy() {
StringBuilder result = new StringBuilder();
for (SizeAwareMetric item : hierarchy) {
result.append(item.toString()).append("\n");
}
return result.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -649,4 +649,56 @@ public void testConsistency() {
assertTrue(metric.isSizeEnabled());
assertTrue(metric.isElementsEnabled());
}

@Test
public void testHierarchy() {
final int INSTANCES = 10;
final int SIZE_PER_MESSAGE = 100;
final int MESSAGES_PER_INSTANCE = 50;

SizeAwareMetric root = new SizeAwareMetric(100000, 50000, 10000, 5000);
SizeAwareMetric[] metrics = new SizeAwareMetric[INSTANCES];

// Create 10 instances and each one adds root to its hierarchy
for (int i = 0; i < INSTANCES; i++) {
metrics[i] = new SizeAwareMetric(10000, 5000, 1000, 500);
metrics[i].addHierarchy(root);
}

// Validate hierarchy is set up correctly on each child
for (int i = 0; i < INSTANCES; i++) {
assertEquals(1, metrics[i].getHierarchy().size());
assertTrue(metrics[i].getHierarchy().contains(root));
}

// Add messages to each instance
for (int i = 0; i < INSTANCES; i++) {
for (int j = 0; j < MESSAGES_PER_INSTANCE; j++) {
metrics[i].addSize(SIZE_PER_MESSAGE);
}
}

// Validate individual metrics
for (int i = 0; i < INSTANCES; i++) {
assertEquals(SIZE_PER_MESSAGE * MESSAGES_PER_INSTANCE, metrics[i].getSize());
assertEquals(MESSAGES_PER_INSTANCE, metrics[i].getElements());
}

// Validate root totals (root should have accumulated from all children via hierarchy)
long expectedTotalSize = (long) INSTANCES * MESSAGES_PER_INSTANCE * SIZE_PER_MESSAGE;
long expectedTotalElements = (long) INSTANCES * MESSAGES_PER_INSTANCE;
assertEquals(expectedTotalSize, root.getSize());
assertEquals(expectedTotalElements, root.getElements());

// Remove hierarchy from each child
for (int i = 0; i < INSTANCES; i++) {
metrics[i].removeHierarchy(root);
}

// Validate hierarchy is empty on each child
for (int i = 0; i < INSTANCES; i++) {
assertEquals(0, metrics[i].getHierarchy().size());
assertFalse(metrics[i].getHierarchy().contains(root));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ public static String getDefaultHapolicyBackupStrategy() {
// true means that the server supports wild card routing
private static boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;

// true means that sizes on wildcard addresses should be aggregated
private static boolean DEFAULT_WILDCARD_AGGREGATE_SIZES = false;

private static String DEFAULT_ADDRESS_PATH_SEPARATOR = ".";

private static SimpleString DEFAULT_MANAGEMENT_ADDRESS = SimpleString.of("activemq.management");
Expand Down Expand Up @@ -834,6 +837,13 @@ public static boolean isDefaultWildcardRoutingEnabled() {
return DEFAULT_WILDCARD_ROUTING_ENABLED;
}

/**
* {@code true} means that sizes on wildcard addresses should be aggregated
*/
public static boolean isDefaultWildcardAggregateSizes() {
return DEFAULT_WILDCARD_AGGREGATE_SIZES;
}

public static String getDefaultAddressPathSeparator() {
return DEFAULT_ADDRESS_PATH_SEPARATOR;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class WildcardConfiguration implements Serializable {

boolean routingEnabled = true;

boolean aggregateSizes = false;

char singleWord = SINGLE_WORD;

char anyWords = ANY_WORDS;
Expand All @@ -59,20 +61,22 @@ public boolean equals(Object obj) {
}

return routingEnabled == other.routingEnabled &&
aggregateSizes == other.aggregateSizes &&
singleWord == other.singleWord &&
anyWords == other.anyWords &&
delimiter == other.delimiter;
}

@Override
public int hashCode() {
return Objects.hash(routingEnabled, singleWord, anyWords, delimiter);
return Objects.hash(routingEnabled, aggregateSizes, singleWord, anyWords, delimiter);
}

@Override
public String toString() {
return "WildcardConfiguration{" +
"routingEnabled=" + routingEnabled +
", aggregateSizes=" + aggregateSizes +
", anyWords=" + anyWords +
", singleWord=" + singleWord +
", delimiter=" + delimiter +
Expand All @@ -88,6 +92,15 @@ public WildcardConfiguration setRoutingEnabled(boolean routingEnabled) {
return this;
}

public boolean isAggregateSizes() {
return aggregateSizes;
}

public WildcardConfiguration setAggregateSizes(boolean aggregateSizes) {
this.aggregateSizes = aggregateSizes;
return this;
}

public char getAnyWords() {
return anyWords;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,7 @@ protected void parseWildcardConfiguration(final Element e, final Configuration m
conf.setSingleWord(getString(e, "single-word", Character.toString(conf.getSingleWord()), NO_CHECK).charAt(0));
conf.setRoutingEnabled(getBoolean(e, "enabled", conf.isRoutingEnabled()));
conf.setRoutingEnabled(getBoolean(e, "routing-enabled", conf.isRoutingEnabled()));
conf.setAggregateSizes(getBoolean(e, "aggregate-sizes", conf.isAggregateSizes()));
}

private ConnectorServiceConfiguration parseConnectorService(final Element e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
*/
PagingStore getPageStore(SimpleString address) throws Exception;

/**
* {@return the PageStore associated with the address if it exists, or null if it doesn't exist}
*/
default PagingStore lookupPageStore(SimpleString address) throws Exception {
return getPageStore(address);
}

/**
* Point to inform/restoring Transactions used when the messages were added into paging
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;

Expand Down Expand Up @@ -93,6 +94,10 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf

long getAddressSize();

default SizeAwareMetric getSizeMetric() {
return null;
}

long getAddressElements();

long getMaxSize();
Expand All @@ -107,6 +112,10 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf

void applySetting(AddressSettings addressSettings);

void addHierarchy(PagingStore related);

void removeHierarchy(PagingStore related);

/**
* This method will look if the current state of paging is not paging, without using a lock. For cases where you need
* absolutely atomic results, check it directly on the internal variables while requiring a readLock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,12 @@ public PagingStore getPageStore(final SimpleString rawStoreName) throws Exceptio
}
}

@Override
public PagingStore lookupPageStore(final SimpleString rawStoreName) throws Exception {
final SimpleString storeName = CompositeAddress.extractAddressName(rawStoreName);
return stores.get(storeName);
}

@Override
public void addTransaction(final PageTransactionInfo pageTransaction) {
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public PagingStoreImpl(final SimpleString address,

this.size = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages).
setUnderCallback(this::underSized).setOverCallback(this::overSized).
setOnSizeCallback(pagingManager::addSize);
setOnSizeCallback(pagingManager::addSize).setOwner(this);

applySetting(addressSettings, true);

Expand Down Expand Up @@ -535,11 +535,29 @@ public long getAddressSize() {
return size.getSize();
}

@Override
public SizeAwareMetric getSizeMetric() {
return size;
}

@Override
public long getAddressElements() {
return size.getElements();
}

@Override
public void addHierarchy(PagingStore related) {
PagingStoreImpl storeRelated = (PagingStoreImpl) related;
size.addHierarchy(storeRelated.size);
}

@Override
public void removeHierarchy(PagingStore related) {
PagingStoreImpl storeRelated = (PagingStoreImpl) related;
size.removeHierarchy(storeRelated.size);
}


@Override
public long getMaxSize() {
if (maxSize <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public PostOfficeImpl(final ActiveMQServer server,
this.addressQueueReaperPeriod = addressQueueReaperPeriod;

if (wildcardConfiguration.isRoutingEnabled()) {
addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager, server.getMetricsManager());
addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager, server.getMetricsManager(), server.getPagingManager(), wildcardConfiguration.isAggregateSizes());
} else {
addressManager = new SimpleAddressManager(this, wildcardConfiguration, storageManager, server.getMetricsManager());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ protected boolean addMappingInternal(final SimpleString address, final Binding b
}

@Override
public boolean reloadAddressInfo(AddressInfo addressInfo) {
public boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception {
return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null;
}

Expand Down
Loading