Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public final class OzoneConsts {
public static final String MAX_NUM_OF_BUCKETS = "maxNumOfBuckets";
public static final String HAS_SNAPSHOT = "hasSnapshot";
public static final String STORAGE_TYPE = "storageType";
public static final String STORAGE_POLICY = "storagePolicy";
public static final String RESOURCE_TYPE = "resourceType";
public static final String IS_VERSION_ENABLED = "isVersionEnabled";
public static final String CREATION_TIME = "creationTime";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
Expand Down Expand Up @@ -87,7 +88,7 @@ default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList) throws IOException {
return allocateBlock(size, numBlocks, replicationConfig, owner,
excludeList, null);
excludeList, null, StorageType.DEFAULT);
}

/**
Expand All @@ -107,9 +108,17 @@ default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList, String clientMachine) throws IOException {
return allocateBlock(size, numBlocks, replicationConfig, owner,
excludeList, clientMachine, StorageType.DEFAULT);
}

List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList, String clientMachine) throws IOException;
ExcludeList excludeList, String clientMachine,
StorageType storageType) throws IOException;

/**
* Delete blocks for a set of object keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
Expand Down Expand Up @@ -173,7 +174,8 @@ public List<AllocatedBlock> allocateBlock(
long size, int num,
ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList,
String clientMachine
String clientMachine,
StorageType storageType
) throws IOException {
Preconditions.checkArgument(size > 0, "block size must be greater than 0");

Expand All @@ -189,6 +191,10 @@ public List<AllocatedBlock> allocateBlock(
requestBuilder.setClient(clientMachine);
}

if (storageType != null) {
requestBuilder.setStorageType(storageType.toProto());
}

switch (replicationConfig.getReplicationType()) {
case STAND_ALONE:
requestBuilder.setFactor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ message AllocateScmBlockRequestProto {

optional string client = 9;

optional hadoop.hdds.StorageTypeProto storageType = 10;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.common.BlockGroup;
Expand All @@ -42,8 +43,8 @@ public interface BlockManager extends Closeable {
* @throws IOException
*/
AllocatedBlock allocateBlock(long size, ReplicationConfig replicationConfig,
String owner,
ExcludeList excludeList) throws IOException, TimeoutException;
String owner, ExcludeList excludeList,
StorageType storageType) throws IOException, TimeoutException;

/**
* Deletes a list of blocks in an atomic operation. Internally, SCM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand Down Expand Up @@ -145,11 +146,14 @@ public void stop() throws IOException {
@Override
public AllocatedBlock allocateBlock(final long size,
ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList)
String owner, ExcludeList excludeList,
StorageType storageType)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Size : {} , replicationConfig: {}", size, replicationConfig);
}
LOG.debug("Allocating block: size={}, replication={}, storageType={}",
size, replicationConfig, storageType);
if (scm.getScmContext().isInSafeMode()) {
throw new SCMException("SafeModePrecheck failed for allocateBlock",
SCMException.ResultCodes.SAFE_MODE_EXCEPTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -193,6 +194,10 @@ private Status exceptionToResponseStatus(IOException ex) {
public AllocateScmBlockResponseProto allocateScmBlock(
AllocateScmBlockRequestProto request, int clientVersion)
throws IOException {
StorageType storageType = request.hasStorageType()
? StorageType.valueOf(request.getStorageType())
: StorageType.DEFAULT;

List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(),
Expand All @@ -202,7 +207,8 @@ public AllocateScmBlockResponseProto allocateScmBlock(
request.getEcReplicationConfig()),
request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()),
request.getClient());
request.getClient(),
storageType);

AllocateScmBlockResponseProto.Builder builder =
AllocateScmBlockResponseProto.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
Expand Down Expand Up @@ -189,7 +190,8 @@ public List<AllocatedBlock> allocateBlock(
long size, int num,
ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList,
String clientMachine
String clientMachine,
StorageType storageType
) throws IOException {
long startNanos = Time.monotonicNowNanos();
Map<String, String> auditMap = Maps.newHashMap();
Expand All @@ -198,6 +200,7 @@ public List<AllocatedBlock> allocateBlock(
auditMap.put("replication", replicationConfig.toString());
auditMap.put("owner", owner);
auditMap.put("client", clientMachine);
auditMap.put("storageType", String.valueOf(storageType));
List<AllocatedBlock> blocks = new ArrayList<>(num);

if (LOG.isDebugEnabled()) {
Expand All @@ -207,7 +210,8 @@ public List<AllocatedBlock> allocateBlock(
try {
for (int i = 0; i < num; i++) {
AllocatedBlock block = scm.getScmBlockManager()
.allocateBlock(size, replicationConfig, owner, excludeList);
.allocateBlock(size, replicationConfig, owner, excludeList,
storageType);
if (block != null) {
// Sort the datanodes if client machine is specified
final Node client = getClientNode(clientMachine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
Expand Down Expand Up @@ -197,7 +198,8 @@ public void testAllocateBlock() throws Exception {
pipelineManager.createPipeline(replicationConfig);
HddsTestUtils.openAllRatisPipelines(pipelineManager);
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
replicationConfig, OzoneConsts.OZONE, new ExcludeList());
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
StorageType.DEFAULT);
assertNotNull(block);
}

Expand All @@ -216,7 +218,7 @@ public void testAllocateBlockWithExclusion() throws Exception {
.get(0).getId());
AllocatedBlock block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
excludeList);
excludeList, StorageType.DEFAULT);
assertNotNull(block);
for (PipelineID id : excludeList.getPipelineIds()) {
assertNotEquals(block.getPipeline().getId(), id);
Expand All @@ -227,7 +229,7 @@ public void testAllocateBlockWithExclusion() throws Exception {
}
block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
excludeList);
excludeList, StorageType.DEFAULT);
assertNotNull(block);
assertThat(excludeList.getPipelineIds()).contains(block.getPipeline().getId());
}
Expand All @@ -249,7 +251,7 @@ void testAllocateBlockInParallel() throws Exception {
future.complete(blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
OzoneConsts.OZONE,
new ExcludeList()));
new ExcludeList(), StorageType.DEFAULT));
} catch (IOException e) {
future.completeExceptionally(e);
}
Expand Down Expand Up @@ -287,7 +289,7 @@ void testBlockDistribution() throws Exception {
AllocatedBlock block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
OzoneConsts.OZONE,
new ExcludeList());
new ExcludeList(), StorageType.DEFAULT);
long containerId = block.getBlockID().getContainerID();
if (!allocatedBlockMap.containsKey(containerId)) {
blockList = new ArrayList<>();
Expand Down Expand Up @@ -343,7 +345,7 @@ void testBlockDistributionWithMultipleDisks() throws Exception {
AllocatedBlock block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
OzoneConsts.OZONE,
new ExcludeList());
new ExcludeList(), StorageType.DEFAULT);
long containerId = block.getBlockID().getContainerID();
if (!allocatedBlockMap.containsKey(containerId)) {
blockList = new ArrayList<>();
Expand Down Expand Up @@ -403,7 +405,7 @@ void testBlockDistributionWithMultipleRaftLogDisks() throws Exception {
AllocatedBlock block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
OzoneConsts.OZONE,
new ExcludeList());
new ExcludeList(), StorageType.DEFAULT);
long containerId = block.getBlockID().getContainerID();
if (!allocatedBlockMap.containsKey(containerId)) {
blockList = new ArrayList<>();
Expand Down Expand Up @@ -439,7 +441,8 @@ public void testAllocateOversizedBlock() {
long size = 6 * GB;
Throwable t = assertThrows(IOException.class, () ->
blockManager.allocateBlock(size,
replicationConfig, OzoneConsts.OZONE, new ExcludeList()));
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
StorageType.DEFAULT));
assertEquals("Unsupported block size: " + size,
t.getMessage());
}
Expand All @@ -450,7 +453,8 @@ public void testAllocateBlockFailureInSafeMode() {
// Test1: In safe mode expect an SCMException.
Throwable t = assertThrows(IOException.class, () ->
blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
replicationConfig, OzoneConsts.OZONE, new ExcludeList()));
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
StorageType.DEFAULT));
assertEquals("SafeModePrecheck failed for allocateBlock",
t.getMessage());
}
Expand All @@ -459,7 +463,8 @@ public void testAllocateBlockFailureInSafeMode() {
public void testAllocateBlockSucInSafeMode() throws Exception {
// Test2: Exit safe mode and then try allocateBock again.
assertNotNull(blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
replicationConfig, OzoneConsts.OZONE, new ExcludeList()));
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
StorageType.DEFAULT));
}

@Test
Expand All @@ -472,14 +477,14 @@ public void testMultipleBlockAllocation()

AllocatedBlock allocatedBlock = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
new ExcludeList());
new ExcludeList(), StorageType.DEFAULT);
// block should be allocated in different pipelines
GenericTestUtils.waitFor(() -> {
try {
AllocatedBlock block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
OzoneConsts.OZONE,
new ExcludeList());
new ExcludeList(), StorageType.DEFAULT);
return !block.getPipeline().getId()
.equals(allocatedBlock.getPipeline().getId());
} catch (IOException e) {
Expand Down Expand Up @@ -525,7 +530,7 @@ public void testMultipleBlockAllocationWithClosedContainer()
blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
OzoneConsts.OZONE,
new ExcludeList());
new ExcludeList(), StorageType.DEFAULT);
} catch (IOException e) {
}
return verifyNumberOfContainersInPipelines(
Expand All @@ -550,7 +555,7 @@ public void testMultipleBlockAllocationWithClosedContainer()
blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
OzoneConsts.OZONE,
new ExcludeList());
new ExcludeList(), StorageType.DEFAULT);
} catch (IOException e) {
}
return verifyNumberOfContainersInPipelines(
Expand All @@ -567,7 +572,7 @@ public void testBlockAllocationWithNoAvailablePipelines()
assertEquals(0, pipelineManager.getPipelines(replicationConfig).size());
assertNotNull(blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
new ExcludeList()));
new ExcludeList(), StorageType.DEFAULT));
}

private class DatanodeCommandHandler implements
Expand Down
Loading