Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void decrUsedBytes(long bytes, boolean increasePendingDeleteBytes) {
}
}

private void incrSnapshotUsedBytes(long bytes) {
public void incrSnapshotUsedBytes(long bytes) {
this.snapshotUsedBytes += bytes;
}

Expand All @@ -275,7 +275,7 @@ public void decrUsedNamespace(long namespaceToUse, boolean increasePendingDelete
}
}

private void incrSnapshotUsedNamespace(long namespaceToUse) {
public void incrSnapshotUsedNamespace(long namespaceToUse) {
this.snapshotUsedNamespace += namespaceToUse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2358,6 +2358,8 @@ message BucketQuotaCount {
required int64 diffUsedBytes = 3;
required int64 diffUsedNamespace = 4;
required bool supportOldQuota = 5 [default=false];
optional int64 diffSnapshotUsedBytes = 6;
optional int64 diffSnapshotUsedNamespace = 7;
}

message QuotaRepairResponse {
Expand Down
12 changes: 12 additions & 0 deletions hadoop-ozone/interface-client/src/main/resources/proto.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8213,6 +8213,18 @@
"value": "false"
}
]
},
{
"id": 6,
"name": "diffSnapshotUsedBytes",
"type": "int64",
"optional": true
},
{
"id": 7,
"name": "diffSnapshotUsedNamespace",
"type": "int64",
"optional": true
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ private void updateBucketInfo(
}
bucketInfo.incrUsedBytes(bucketCountInfo.getDiffUsedBytes());
bucketInfo.incrUsedNamespace(bucketCountInfo.getDiffUsedNamespace());
if (bucketCountInfo.hasDiffSnapshotUsedBytes()) {
bucketInfo.incrSnapshotUsedBytes(bucketCountInfo.getDiffSnapshotUsedBytes());
}
if (bucketCountInfo.hasDiffSnapshotUsedNamespace()) {
bucketInfo.incrSnapshotUsedNamespace(bucketCountInfo.getDiffSnapshotUsedNamespace());
}
if (bucketCountInfo.getSupportOldQuota()) {
OmBucketInfo.Builder builder = bucketInfo.toBuilder();
if (bucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.util.Time;
Expand All @@ -72,6 +73,8 @@ public class QuotaRepairTask {
QuotaRepairTask.class);
private static final int BATCH_SIZE = 5000;
private static final int TASK_THREAD_CNT = 3;
/** Parallel full-table scans (OBS keys, FSO files, dirs, deleted keys, deleted dirs). */
private static final int QUOTA_REPAIR_SCAN_TASKS = 5;
private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false);
private static final RepairStatus REPAIR_STATUS = new RepairStatus();
private static final AtomicLong RUN_CNT = new AtomicLong(0);
Expand Down Expand Up @@ -103,16 +106,14 @@ public static String getStatus() {
private boolean repairTask(List<String> buckets) {
LOG.info("Starting quota repair task {}", REPAIR_STATUS);
// thread pool with 3 Table type * (1 task each + 3 thread for each task)
executor = Executors.newFixedThreadPool(3 * (1 + TASK_THREAD_CNT));
executor = Executors.newFixedThreadPool(QUOTA_REPAIR_SCAN_TASKS * (1 + TASK_THREAD_CNT));
try (OMMetadataManager activeMetaManager =
createActiveDBCheckpoint(om.getMetadataManager(), om.getConfiguration())) {
OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder
= OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder();
// repair active db
repairActiveDb(activeMetaManager, builder, buckets);

// TODO: repair snapshots for quota

// submit request to update
ClientId clientId = ClientId.randomId();
OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder()
Expand Down Expand Up @@ -175,6 +176,10 @@ private void repairActiveDb(
bucketCountBuilder.setDiffUsedBytes(updatedBuckedInfo.getUsedBytes() - oriBucketInfo.getUsedBytes());
bucketCountBuilder.setDiffUsedNamespace(
updatedBuckedInfo.getUsedNamespace() - oriBucketInfo.getUsedNamespace());
bucketCountBuilder.setDiffSnapshotUsedBytes(
updatedBuckedInfo.getSnapshotUsedBytes() - oriBucketInfo.getSnapshotUsedBytes());
bucketCountBuilder.setDiffSnapshotUsedNamespace(
updatedBuckedInfo.getSnapshotUsedNamespace() - oriBucketInfo.getSnapshotUsedNamespace());
bucketCountBuilder.setSupportOldQuota(oldQuota);
builder.addBucketCount(bucketCountBuilder.build());
}
Expand Down Expand Up @@ -253,17 +258,28 @@ private static void populateBucket(
oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
bucketInfo.decrUsedBytes(bucketInfo.getUsedBytes(), false);
bucketInfo.decrUsedNamespace(bucketInfo.getUsedNamespace(), false);
resetSnapshotBucketQuota(bucketInfo);
nameBucketInfoMap.put(bucketNameKey, bucketInfo);
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
bucketInfo.getObjectID()), bucketInfo);
}

private boolean isChange(OmBucketInfo lBucketInfo, OmBucketInfo rBucketInfo) {
if (lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace()
|| lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes()) {
return true;
return lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace()
|| lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes()
|| lBucketInfo.getSnapshotUsedBytes() != rBucketInfo.getSnapshotUsedBytes()
|| lBucketInfo.getSnapshotUsedNamespace() != rBucketInfo.getSnapshotUsedNamespace();
}

private static void resetSnapshotBucketQuota(OmBucketInfo bucketInfo) {
long snapBytes = bucketInfo.getSnapshotUsedBytes();
if (snapBytes != 0) {
bucketInfo.purgeSnapshotUsedBytes(snapBytes);
}
long snapNs = bucketInfo.getSnapshotUsedNamespace();
if (snapNs != 0) {
bucketInfo.purgeSnapshotUsedNamespace(snapNs);
}
return false;
}

private static String buildNamePath(String volumeName, String bucketName) {
Expand Down Expand Up @@ -293,14 +309,18 @@ private void repairCount(
Map<String, CountPair> keyCountMap = new ConcurrentHashMap<>();
Map<String, CountPair> fileCountMap = new ConcurrentHashMap<>();
Map<String, CountPair> directoryCountMap = new ConcurrentHashMap<>();
Map<String, CountPair> snapshotDeletedKeyMap = new ConcurrentHashMap<>();
Map<String, CountPair> snapshotDeletedDirMap = new ConcurrentHashMap<>();
try {
nameBucketInfoMap.keySet().stream().forEach(e -> keyCountMap.put(e,
new CountPair()));
idBucketInfoMap.keySet().stream().forEach(e -> fileCountMap.put(e,
new CountPair()));
idBucketInfoMap.keySet().stream().forEach(e -> directoryCountMap.put(e,
new CountPair()));

nameBucketInfoMap.keySet().forEach(k -> snapshotDeletedKeyMap.put(k, new CountPair()));
idBucketInfoMap.keySet().forEach(k -> snapshotDeletedDirMap.put(k, new CountPair()));

List<Future<?>> tasks = new ArrayList<>();
tasks.add(executor.submit(() -> recalculateUsages(
metadataManager.getKeyTable(BucketLayout.OBJECT_STORE),
Expand All @@ -311,6 +331,14 @@ private void repairCount(
tasks.add(executor.submit(() -> recalculateUsages(
metadataManager.getDirectoryTable(),
directoryCountMap, "Directory usages", false)));

Map<Long, OmBucketInfo> bucketById = buildBucketByObjectId(idBucketInfoMap);

tasks.add(executor.submit(() -> recalculateSnapshotDeletedKeyUsages(
metadataManager.getDeletedTable(), bucketById, snapshotDeletedKeyMap)));
tasks.add(executor.submit(() -> recalculateSnapshotDeletedDirNamespace(
metadataManager.getDeletedDirTable(), snapshotDeletedDirMap)));

for (Future<?> f : tasks) {
f.get();
}
Expand All @@ -326,9 +354,104 @@ private void repairCount(
updateCountToBucketInfo(nameBucketInfoMap, keyCountMap);
updateCountToBucketInfo(idBucketInfoMap, fileCountMap);
updateCountToBucketInfo(idBucketInfoMap, directoryCountMap);
mergeSnapshotDeletedTableCounts(nameBucketInfoMap, snapshotDeletedKeyMap);
mergeDeletedDirSnapshotNamespace(idBucketInfoMap, snapshotDeletedDirMap);
LOG.info("Completed quota repair counting for all keys, files and directories");
}

private static Map<Long, OmBucketInfo> buildBucketByObjectId(
Map<String, OmBucketInfo> idBucketInfoMap) {
Map<Long, OmBucketInfo> bucketById = new HashMap<>();
for (OmBucketInfo bucketInfo : idBucketInfoMap.values()) {
bucketById.putIfAbsent(bucketInfo.getObjectID(), bucketInfo);
}
return bucketById;
}

private void recalculateSnapshotDeletedKeyUsages(
Table<String, RepeatedOmKeyInfo> deletedTable,
Map<Long, OmBucketInfo> bucketById,
Map<String, CountPair> snapshotCountByBucketNameKey)
throws UncheckedIOException {
LOG.info("Starting recalculate snapshot usages from deletedTable");

int count = 0;
long startTime = Time.monotonicNow();
try (Table.KeyValueIterator<String, RepeatedOmKeyInfo> keyIter
= deletedTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
count++;
RepeatedOmKeyInfo val = kv.getValue();
OmBucketInfo bucket = bucketById.get(val.getBucketId());
if (bucket == null) {
continue;
}
String nameKey = buildNamePath(bucket.getVolumeName(), bucket.getBucketName());
CountPair usage = snapshotCountByBucketNameKey.get(nameKey);
if (usage == null) {
continue;
}
usage.incrSpace(val.getTotalSize().getRight());
usage.incrNamespace(val.getOmKeyInfoList().size());
}
LOG.info("Recalculate snapshot usages from deletedTable completed, count {} time {}ms",
count, (Time.monotonicNow() - startTime));
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}

private void recalculateSnapshotDeletedDirNamespace(
Table<String, OmKeyInfo> deletedDirTable,
Map<String, CountPair> snapshotDirNsByIdPrefix)
throws UncheckedIOException {
LOG.info("Starting recalculate snapshot namespace from deletedDirectoryTable");

int count = 0;
long startTime = Time.monotonicNow();
try (Table.KeyValueIterator<String, OmKeyInfo> keyIter
= deletedDirTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
count++;
String prefix = getVolumeBucketPrefix(kv.getKey());
CountPair usage = snapshotDirNsByIdPrefix.get(prefix);
if (usage != null) {
usage.incrNamespace(1L);
}
}
LOG.info(
"Recalculate snapshot namespace from deletedDirectoryTable completed, count {} time {}ms",
count, (Time.monotonicNow() - startTime));
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}

private static synchronized void mergeSnapshotDeletedTableCounts(
Map<String, OmBucketInfo> nameBucketInfoMap,
Map<String, CountPair> counts) {
for (Map.Entry<String, CountPair> entry : counts.entrySet()) {
OmBucketInfo bucket = nameBucketInfoMap.get(entry.getKey());
if (bucket != null) {
bucket.incrSnapshotUsedBytes(entry.getValue().getSpace());
bucket.incrSnapshotUsedNamespace(entry.getValue().getNamespace());
}
}
}

private static synchronized void mergeDeletedDirSnapshotNamespace(
Map<String, OmBucketInfo> idBucketInfoMap,
Map<String, CountPair> counts) {
for (Map.Entry<String, CountPair> entry : counts.entrySet()) {
OmBucketInfo bucket = idBucketInfoMap.get(entry.getKey());
if (bucket != null) {
bucket.incrSnapshotUsedNamespace(entry.getValue().getNamespace());
}
}
}

private <VALUE> void recalculateUsages(
Table<String, VALUE> table, Map<String, CountPair> prefixUsageMap,
String strType, boolean haveValue) throws UncheckedIOException,
Expand Down Expand Up @@ -500,6 +623,12 @@ public void updateStatus(OzoneManagerProtocolProtos.QuotaRepairRequest.Builder b
ConcurrentHashMap<String, Long> diffCountMap = new ConcurrentHashMap<>();
diffCountMap.put("DiffUsedBytes", quotaCount.getDiffUsedBytes());
diffCountMap.put("DiffUsedNamespace", quotaCount.getDiffUsedNamespace());
if (quotaCount.hasDiffSnapshotUsedBytes()) {
diffCountMap.put("DiffSnapshotUsedBytes", quotaCount.getDiffSnapshotUsedBytes());
}
if (quotaCount.hasDiffSnapshotUsedNamespace()) {
diffCountMap.put("DiffSnapshotUsedNamespace", quotaCount.getDiffSnapshotUsedNamespace());
}
bucketCountDiffMap.put(bucketKey, diffCountMap);
}
}
Expand Down
Loading