Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import org.apache.hadoop.hdds.utils.HddsVersionInfo;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.DatanodeStorage;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
Expand Down Expand Up @@ -287,7 +287,7 @@ public String getNamespace() {
LOG.info("Hdds Datanode login successful.");
}

DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
DatanodeStorage layoutStorage = new DatanodeStorage(conf,
datanodeDetails.getUuidString());
if (layoutStorage.getState() != INITIALIZED) {
layoutStorage.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@
* DataNodeStorageConfig is responsible for management of the
* StorageDirectories used by the DataNode.
*/
public class DatanodeLayoutStorage extends Storage {
public class DatanodeStorage extends Storage {
/**
* Construct DataNodeStorageConfig.
* @throws IOException if any directories are inaccessible.
*/
public DatanodeLayoutStorage(ConfigurationSource conf, String dataNodeId)
public DatanodeStorage(ConfigurationSource conf, String dataNodeId)
throws IOException {
super(NodeType.DATANODE, ServerUtils.getOzoneMetaDirPath(conf),
DATANODE_LAYOUT_VERSION_DIR, dataNodeId, getDefaultLayoutVersion(conf));
}

public DatanodeLayoutStorage(OzoneConfiguration conf, String dataNodeId,
int layoutVersion)
public DatanodeStorage(OzoneConfiguration conf, String dataNodeId,
int layoutVersion)
throws IOException {
super(NodeType.DATANODE, ServerUtils.getOzoneMetaDirPath(conf),
DATANODE_LAYOUT_VERSION_DIR, dataNodeId, layoutVersion);
}

public DatanodeLayoutStorage(ConfigurationSource conf)
public DatanodeStorage(ConfigurationSource conf)
throws IOException {
super(NodeType.DATANODE, ServerUtils.getOzoneMetaDirPath(conf),
DATANODE_LAYOUT_VERSION_DIR, getDefaultLayoutVersion(conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.upgrade.DatanodeUpgradeActionProvider;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.NettyMetrics;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.DatanodeStorage;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
Expand All @@ -61,7 +59,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CreatePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeVersionCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconcileContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
Expand All @@ -82,11 +80,9 @@
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.container.upgrade.DataNodeUpgradeFinalizer;
import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalization.StatusAndMessages;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -117,9 +113,8 @@ public class DatanodeStateMachine implements Closeable {

private final HddsDatanodeStopService hddsDatanodeStopService;

private final HDDSLayoutVersionManager layoutVersionManager;
private final DatanodeLayoutStorage layoutStorage;
private final DataNodeUpgradeFinalizer upgradeFinalizer;
private final DatanodeVersionManager versionManager;
private final DatanodeStorage storage;

/**
* Used to synchronize to the OzoneContainer object created in the
Expand Down Expand Up @@ -165,13 +160,11 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,

Clock clock = Clock.system(ZoneId.systemDefault());
// Expected to be initialized already.
layoutStorage = new DatanodeLayoutStorage(conf,
storage = new DatanodeStorage(conf,
datanodeDetails.getUuidString());

layoutVersionManager = new HDDSLayoutVersionManager(
layoutStorage.getApparentVersion(), null, new DatanodeUpgradeActionProvider());
upgradeFinalizer = new DataNodeUpgradeFinalizer(layoutVersionManager);
VersionedDatanodeFeatures.initialize(layoutVersionManager);
versionManager = new DatanodeVersionManager(storage, this);
VersionedDatanodeFeatures.initialize(versionManager);

String threadNamePrefix = datanodeDetails.threadNamePrefix();
executorService = Executors.newFixedThreadPool(
Expand Down Expand Up @@ -277,7 +270,7 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
closePipelineCommandExecutorService))
.addHandler(new CreatePipelineCommandHandler(conf,
createPipelineCommandExecutorService))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new FinalizeVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
.addHandler(new ReconcileContainerCommandHandler(supervisor, dnClient));

Expand Down Expand Up @@ -455,8 +448,8 @@ public void close() throws IOException {
if (cmdProcessThread != null) {
cmdProcessThread.interrupt();
}
if (layoutVersionManager != null) {
layoutVersionManager.close();
if (versionManager != null) {
versionManager.close();
}
context.setState(DatanodeStates.getLastState());
replicationSupervisorMetrics.unRegister();
Expand Down Expand Up @@ -748,29 +741,13 @@ public ReplicationSupervisor getSupervisor() {
return supervisor;
}

@VisibleForTesting
public HDDSLayoutVersionManager getLayoutVersionManager() {
return layoutVersionManager;
public DatanodeVersionManager getVersionManager() {
return versionManager;
}

@VisibleForTesting
public DatanodeLayoutStorage getLayoutStorage() {
return layoutStorage;
}

public StatusAndMessages finalizeUpgrade()
throws IOException {
return upgradeFinalizer.finalize(datanodeDetails.getUuidString(), this);
}

public StatusAndMessages queryUpgradeStatus()
throws IOException {
return upgradeFinalizer.reportStatus(datanodeDetails.getUuidString(),
true);
}

public UpgradeFinalizer<DatanodeStateMachine> getUpgradeFinalizer() {
return upgradeFinalizer;
public DatanodeStorage getStorage() {
return storage;
}

public ConfigurationSource getConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import static org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_REQUIRED;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.FinalizeNewLayoutVersionCommandProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
Expand All @@ -28,30 +26,32 @@
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
import org.apache.hadoop.ozone.protocol.commands.FinalizeVersionCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.upgrade.UpgradeException;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handler for FinalizeNewLayoutVersion command received from SCM.
* Handler for FinalizeVersion command received from SCM.
*/
public class FinalizeNewLayoutVersionCommandHandler implements CommandHandler {
public class FinalizeVersionCommandHandler implements CommandHandler {

private static final Logger LOG =
LoggerFactory.getLogger(FinalizeNewLayoutVersionCommandHandler.class);
LoggerFactory.getLogger(FinalizeVersionCommandHandler.class);

private AtomicLong invocationCount = new AtomicLong(0);
private final AtomicLong invocationCount = new AtomicLong(0);
private final MutableRate opsLatencyMs;

/**
* Constructs a FinalizeNewLayoutVersionCommandHandler.
* Constructs a FinalizeVersionCommandHandler.
*/
public FinalizeNewLayoutVersionCommandHandler() {
public FinalizeVersionCommandHandler() {
MetricsRegistry registry = new MetricsRegistry(
FinalizeNewLayoutVersionCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.finalizeNewLayoutVersionCommand + "Ms");
FinalizeVersionCommandHandler.class.getSimpleName());
this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.finalizeNewLayoutVersionCommand + "Ms");
}

/**
Expand All @@ -65,24 +65,20 @@ public FinalizeNewLayoutVersionCommandHandler() {
@Override
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command.");
LOG.info("Processing FinalizeVersionCommandHandler command.");
invocationCount.incrementAndGet();
final long startTime = Time.monotonicNow();
DatanodeStateMachine dsm = context.getParent();
final FinalizeNewLayoutVersionCommandProto finalizeCommand =
((FinalizeNewLayoutVersionCommand)command).getProto();
((FinalizeVersionCommand) command).getProto();
try {
if (finalizeCommand.getFinalizeNewLayoutVersion()) {
// SCM is asking datanode to finalize
if (dsm.getLayoutVersionManager().getUpgradeState() ==
FINALIZATION_REQUIRED) {
// SCM will keep sending Finalize command until datanode mlv == slv
// we need to avoid multiple invocations of finalizeUpgrade.
LOG.info("Finalize Upgrade called!");
dsm.finalizeUpgrade();
if (dsm.getVersionManager().needsFinalization()) {
LOG.info("Finalize upgrade called.");
dsm.getVersionManager().finalizeUpgrade();
}
}
} catch (Exception e) {
} catch (UpgradeException e) {
LOG.error("Exception during finalization.", e);
} finally {
long endTime = Time.monotonicNow();
Expand All @@ -107,7 +103,7 @@ public SCMCommandProto.Type getCommandType() {
*/
@Override
public int getInvocationCount() {
return (int)invocationCount.get();
return (int) invocationCount.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_ACTION_MAX_LIMIT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto;

import com.google.common.base.Preconditions;
Expand All @@ -43,18 +44,18 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
import org.apache.hadoop.ozone.protocol.commands.FinalizeVersionCommand;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
Expand All @@ -76,30 +77,24 @@ public class HeartbeatEndpointTask
private StateContext context;
private int maxContainerActionsPerHB;
private int maxPipelineActionsPerHB;
private HDDSLayoutVersionManager layoutVersionManager;
private final DatanodeVersionManager versionManager;

/**
* Constructs a SCM heart beat.
*
* @param rpcEndpoint rpc Endpoint
* @param conf Config.
* @param context State context
* @param versionManager Layout version Manager
*/
public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
ConfigurationSource conf, StateContext context,
HDDSLayoutVersionManager versionManager) {
ConfigurationSource conf, StateContext context) {
this.rpcEndpoint = rpcEndpoint;
this.context = context;
this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT,
HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT);
if (versionManager != null) {
this.layoutVersionManager = versionManager;
} else {
this.layoutVersionManager = context.getParent().getLayoutVersionManager();
}
this.versionManager = context.getParent().getVersionManager();
}

/**
Expand Down Expand Up @@ -135,8 +130,8 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
Preconditions.checkState(this.datanodeDetailsProto != null);

LayoutVersionProto layoutinfo = toLayoutVersionProto(
layoutVersionManager.getMetadataLayoutVersion(),
layoutVersionManager.getSoftwareLayoutVersion());
versionManager.getApparentVersion().serialize(),
versionManager.getSoftwareVersion().serialize());

requestBuilder = SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto)
Expand Down Expand Up @@ -370,15 +365,12 @@ private void processResponse(SCMHeartbeatResponseProto response,
setNodeOperationalStateCommand);
break;
case finalizeNewLayoutVersionCommand:
FinalizeNewLayoutVersionCommand finalizeNewLayoutVersionCommand =
FinalizeNewLayoutVersionCommand.getFromProtobuf(
commandResponseProto.getFinalizeNewLayoutVersionCommandProto());
FinalizeVersionCommand finalizeVersionCommand =
FinalizeVersionCommand.getFromProtobuf(commandResponseProto.getFinalizeNewLayoutVersionCommandProto());
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM finalize command {}",
finalizeNewLayoutVersionCommand.getId());
LOG.debug("Received SCM finalize command {}", finalizeVersionCommand.getId());
}
processCommonCommand(commandResponseProto,
finalizeNewLayoutVersionCommand);
processCommonCommand(commandResponseProto, finalizeVersionCommand);
break;
case refreshVolumeUsageInfo:
RefreshVolumeUsageCommand refreshVolumeUsageCommand =
Expand Down Expand Up @@ -444,7 +436,6 @@ public static class Builder {
private ConfigurationSource conf;
private DatanodeDetails datanodeDetails;
private StateContext context;
private HDDSLayoutVersionManager versionManager;

/**
* Constructs the builder class.
Expand All @@ -463,17 +454,6 @@ public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
return this;
}

/**
* Sets the LayoutVersionManager.
*
* @param lvm config
* @return Builder
*/
public Builder setLayoutVersionManager(HDDSLayoutVersionManager lvm) {
this.versionManager = lvm;
return this;
}

/**
* Sets the Config.
*
Expand Down Expand Up @@ -526,7 +506,7 @@ public HeartbeatEndpointTask build() {
}

HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
.endPointStateMachine, this.conf, this.context, this.versionManager);
.endPointStateMachine, this.conf, this.context);
task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
return task;
}
Expand Down
Loading