Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,23 @@ protected ReferenceCounted readData() throws Exception {
}
long maxSize = Math.min(batchRequest.getMaxSize(), maxBatchReadSize);
//See BookieProtoEncoding.ResponseEnDeCoderPreV3#encode on BatchedReadResponse case.
long frameSize = 24 + 8 + 4;
long frameSize = 24 + 8 + Integer.BYTES;
for (int i = 0; i < maxCount; i++) {
try {
ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId() + i);
frameSize += entry.readableBytes() + 4;
if (data == null) {
frameSize += entry.readableBytes() + Integer.BYTES;
data = ByteBufList.get(entry);
long perEntrySize = entry.readableBytes() + Integer.BYTES;
long remainingBudget = maxSize - frameSize;
long remainingEntries = remainingBudget > 0 ? remainingBudget / Math.max(perEntrySize, 1L) : 0L;
maxCount = (int) Math.min(maxCount, 1L + remainingEntries);
} else {
if (frameSize > maxSize) {
if (frameSize + entry.readableBytes() + Integer.BYTES > maxSize) {
entry.release();
break;
}
frameSize += entry.readableBytes() + Integer.BYTES;
data.add(entry);
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,16 @@ public void testBatchReadWithV2Protocol() throws Exception {
entries++;
}
assertEquals(expectEntriesNum, entries);

// The first entry is still returned even when maxSize is smaller than a single entry frame.
entries = 0;
for (Enumeration<LedgerEntry> readEntries = lh.batchReadEntries(0, 20, headerSize);
readEntries.hasMoreElements();) {
LedgerEntry entry = readEntries.nextElement();
assertArrayEquals(data, entry.getEntry());
entries++;
}
assertEquals(1, entries);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.bookkeeper.proto;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -46,6 +49,7 @@
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -221,4 +225,171 @@ public void testNonFenceRequest() throws Exception {
assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
assertEquals(BookieProtocol.EOK, response.getErrorCode());
}
}

@Test
public void testReadDataPredictsMaxCountFromUniformFirstEntrySize() throws Exception {
long ledgerId = 1234L;
long firstEntryId = 1L;
int entrySize = 20;
long maxSize = 24 + 8 + Integer.BYTES + (entrySize + Integer.BYTES) * 2L;

ByteBuf firstEntry = entryBuffer(entrySize);
ByteBuf secondEntry = entryBuffer(entrySize);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1))).thenReturn(secondEntry);

BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize);
ByteBufList data = (ByteBufList) processor.readData();
assertNotNull(data);
try {
assertEquals(2, data.size());
} finally {
data.release();
}

verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId));
verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId + 1));
verify(bookie, never()).readEntry(eq(ledgerId), eq(firstEntryId + 2));
}

@Test
public void testReadDataReturnsFirstEntryEvenIfItAloneExceedsMaxSize() throws Exception {
long ledgerId = 1235L;
long firstEntryId = 1L;
int firstEntrySize = 20;
long maxSize = 50;

ByteBuf firstEntry = entryBuffer(firstEntrySize);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);

BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize);
ByteBufList data = (ByteBufList) processor.readData();
assertNotNull(data);
try {
assertEquals(1, data.size());
} finally {
data.release();
}

verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId));
verify(bookie, never()).readEntry(eq(ledgerId), eq(firstEntryId + 1));
}

@Test
public void testReadDataReleasesOneOverReadEntryWhenSizesGrow() throws Exception {
long ledgerId = 1236L;
long firstEntryId = 1L;
int firstEntrySize = 10;
int secondEntrySize = 40;
long maxSize = 80;

ByteBuf firstEntry = entryBuffer(firstEntrySize);
ByteBuf secondEntry = entryBuffer(secondEntrySize);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1))).thenReturn(secondEntry);

BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize);
ByteBufList data = (ByteBufList) processor.readData();
assertNotNull(data);
try {
assertEquals(1, data.size());
assertEquals(0, secondEntry.refCnt());
} finally {
data.release();
}

verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId));
verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId + 1));
verify(bookie, never()).readEntry(eq(ledgerId), eq(firstEntryId + 2));
}

@Test
public void testReadDataStopsOnMissingSubsequentEntry() throws Exception {
long ledgerId = 1237L;
long firstEntryId = 1L;
int firstEntrySize = 20;

ByteBuf firstEntry = entryBuffer(firstEntrySize);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1)))
.thenThrow(new Bookie.NoEntryException(ledgerId, firstEntryId + 1));

BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024);
ByteBufList data = (ByteBufList) processor.readData();
assertNotNull(data);
try {
assertEquals(1, data.size());
} finally {
data.release();
}
}

@Test
public void testReadDataStopsOnIOExceptionAfterFirstEntry() throws Exception {
long ledgerId = 1238L;
long firstEntryId = 1L;

ByteBuf firstEntry = entryBuffer(20);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1)))
.thenThrow(new IOException("disk error"));

BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024);
ByteBufList data = (ByteBufList) processor.readData();
assertNotNull(data);
try {
assertEquals(1, data.size());
} finally {
data.release();
}
}

@Test
public void testProcessPacketReturnsPrefixWhenSubsequentReadFails() throws Exception {
ChannelPromise promise = new DefaultChannelPromise(channel);
AtomicReference<Object> writtenObject = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
writtenObject.set(invocationOnMock.getArgument(0));
promise.setSuccess();
latch.countDown();
return promise;
}).when(channel).writeAndFlush(any(Response.class));

long ledgerId = 1239L;
long firstEntryId = 1L;
ByteBuf firstEntry = entryBuffer(20);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1)))
.thenThrow(new IOException("disk error"));

BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024);
processor.run();

latch.await();
assertTrue(writtenObject.get() instanceof Response);
BookieProtocol.BatchedReadResponse response = (BookieProtocol.BatchedReadResponse) writtenObject.get();
try {
assertEquals(BookieProtocol.EOK, response.getErrorCode());
assertEquals(1, response.getData().size());
} finally {
response.release();
}
assertEquals(0, firstEntry.refCnt());
}

private BatchedReadEntryProcessor createProcessor(long ledgerId, long entryId, int maxCount, long maxSize) {
ExecutorService service = mock(ExecutorService.class);
BookieProtocol.BatchedReadRequest request = BookieProtocol.BatchedReadRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, BookieProtocol.FLAG_NONE, new byte[] {},
0L, maxCount, maxSize);
return BatchedReadEntryProcessor.create(request, requestHandler, requestProcessor, service, true,
5 * 1024 * 1024);
}

private static ByteBuf entryBuffer(int size) {
ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size);
entry.writeZero(size);
return entry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,35 @@ public ByteBufList batchReadEntries(BookieId bookieId, int flags, long ledgerId,
if (maxCount <= 0) {
maxCount = Integer.MAX_VALUE;
}
long frameSize = 24 + 8 + 4;
long frameSize = 24 + 8 + Integer.BYTES;
for (long i = startEntryId; i < startEntryId + maxCount; i++) {
ByteBuf entry = ledger.getEntry(i);
frameSize += entry.readableBytes() + 4;
if (data == null) {
data = ByteBufList.get(entry);
} else {
if (frameSize > maxSize) {
entry.release();
break;
}
data.add(entry);
}
ByteBuf entry = ledger.getEntry(i);
if (data == null) {
if (entry == null) {
LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i);
throw new BKException.BKNoSuchEntryException();
}
frameSize += entry.readableBytes() + Integer.BYTES;
data = ByteBufList.get(entry);
long perEntrySize = entry.readableBytes() + Integer.BYTES;
long remainingBudget = maxSize - frameSize;
long remainingEntries = remainingBudget > 0 ? remainingBudget / Math.max(perEntrySize, 1L) : 0L;
maxCount = (int) Math.min(maxCount, 1L + remainingEntries);
continue;
}

if (entry == null) {
LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i);
break;
}

if (frameSize + entry.readableBytes() + Integer.BYTES > maxSize) {
// MockLedgerData returns the stored shared buffer, so this path does not own the skipped entry.
break;
}

frameSize += entry.readableBytes() + Integer.BYTES;
data.add(entry);
}
return data;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.proto;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.util.ByteBufList;
import org.junit.Test;

public class MockBookiesTest {

private static final BookieId BOOKIE_ID = BookieId.parse("127.0.0.1:3181");
private static final long LEDGER_ID = 1L;
private static final long BATCH_RESPONSE_HEADER_SIZE = 24 + 8 + 4;

@Test
public void testBatchReadStopsOnMissingSubsequentEntry() throws Exception {
MockBookies mockBookies = new MockBookies();
mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8));

ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, Long.MAX_VALUE);

assertNotNull(data);
assertEquals(1, data.size());
assertEquals(8, data.getBuffer(0).readableBytes());
}

@Test
public void testBatchReadDoesNotReleaseOversizedSkippedEntry() throws Exception {
MockBookies mockBookies = new MockBookies();
mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8));
mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 1L, newEntry(16));

long maxSize = BATCH_RESPONSE_HEADER_SIZE + 8 + Integer.BYTES + 16 + Integer.BYTES - 1;
ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, maxSize);

assertNotNull(data);
assertEquals(1, data.size());
assertEquals(8, data.getBuffer(0).readableBytes());
assertEquals(16, mockBookies.readEntry(BOOKIE_ID, 0, LEDGER_ID, 1L).readableBytes());
}

private static ByteBuf newEntry(int size) {
return Unpooled.buffer(size).writeZero(size);
}
}
Loading