Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ API Changes
---------------------
* GITHUB#15929: Rename CollectionStatistics to FieldStats and TermStatistics to TermStats. (Zhou Hui)

* GITHUB#15935: Add protected afterMerge() lifecycle hook to KnnVectorsWriter and
PerFieldKnnVectorsFormat. Subclasses can override this to release merge-time resources such as
thread pools. The method is called in a finally block within merge(), guaranteeing invocation
even if mergeOneField throws an exception. (MrFlap)

* GITHUB#15763: Deprecate Operations.complement() method. This operation can be slow and is not
recommended for production use. It will be removed in Lucene 12. (Saurabh Singh)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE
/** Called once at the end before close */
public abstract void finish() throws IOException;

/**
* Called after all vector fields have been merged but before {@link #finish()} and {@link
* #close()}. Subclasses can override this to release merge-time resources such as thread pools.
* The default implementation is a no-op.
*
* <p>This method is guaranteed to be called even if {@link #mergeOneField} throws an exception.
*
* @throws IOException if an I/O error occurs during cleanup
*/
protected void afterMerge() throws IOException {}

/**
* Merges the segment vectors for all fields. This default implementation delegates to {@link
* #mergeOneField}, passing a {@link KnnVectorsReader} that combines the vector values and ignores
Expand All @@ -96,18 +107,22 @@ public final void merge(MergeState mergeState) throws IOException {
}
}

for (FieldInfo fieldInfo : mergeState.mergeFieldInfos) {
if (fieldInfo.hasVectorValues()) {
if (mergeState.infoStream.isEnabled("VV")) {
mergeState.infoStream.message("VV", "merging " + mergeState.segmentInfo);
}
try {
for (FieldInfo fieldInfo : mergeState.mergeFieldInfos) {
if (fieldInfo.hasVectorValues()) {
if (mergeState.infoStream.isEnabled("VV")) {
mergeState.infoStream.message("VV", "merging " + mergeState.segmentInfo);
}

mergeOneField(fieldInfo, mergeState);
mergeOneField(fieldInfo, mergeState);

if (mergeState.infoStream.isEnabled("VV")) {
mergeState.infoStream.message("VV", "merge done " + mergeState.segmentInfo);
if (mergeState.infoStream.isEnabled("VV")) {
mergeState.infoStream.message("VV", "merge done " + mergeState.segmentInfo);
}
}
}
} finally {
afterMerge();
}
finish();
Comment on lines -107 to 127
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't finish work? It seems like the writer should know its in a merge context already and finish should just handle that? Adding a new interface seems unnecessary?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would like to manage the threads in PerFieldKnnVectorsFormat, which doesn't get a callback once KnnVectorsWriter.finish() happens. Also, the suggested afterMerge() runs in a finally block, so we can be sure to close out the threads even on exception.

Copy link
Copy Markdown
Member

@benwtrent benwtrent Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. On an exception with merge, all files, etc. should be closed, do we want to move finish into the finally? There may be edge cases, for sure...
  2. Are we sure extending PerFieldKnnVectorsFormat is the way forward here? My point is that this type of thing is doable via experts in Lucene in different ways than adding another top-level API to the KnnVectorFormat which then impacts all formats.

I want to be very careful about adding an expert level API that Apache Lucene itself doesn't actually use.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On an exception with merge, all files, etc. should be closed, do we want to move finish into the finally? There may be edge cases, for sure...

I'm not sure I can give a completely educated call on this, but it seems like the right thing to do. If mergeOneField throws mid-way, the finish() call will be skipped.

Are we sure extending PerFieldKnnVectorsFormat is the way forward here? My point is that this type of thing is doable via experts in Lucene in different ways than adding another top-level API to the KnnVectorFormat which then impacts all formats.

Understandable. I'm not a Lucene expert, but I don't think there is an easy way for PerFieldKnnVectorsFormat to know when a merge completes. I'll continue looking into this. cc @navneet1v

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying, that any expert extending PerFieldKnnVectorsFormat can likely do this in another way and requiring a Lucene change that Lucene doesn't benefit from is a tricky situation.

I am just wondering if this is the best way for Lucene.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, the overrider for PerFieldKnnVectorsFormat could wrap a try/final directly in mergeOneField. I am not sure a new interface is required at all.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our desired functionality is to have a set number N threads per merge. This way we won't have resource contention between merges. So if one field needs 50 singular merge operations, we should see 50 * N threads spawning. Wrapping functionality around mergeOneField is difficult because we want to

  1. Allocate N threads when each singular merge operation starts.
  2. Release N threads when each singular merge operation completes.
  3. Be able to release N threads on a specific merge operation when an exception happens

We would really like to keep it to a "N threads per singular merge operation" as making it a set number of threads per field merge could lead to resource contention per thread. We don't really see an easy way to be this granular about how to detect and allocate resources per merge, since there are no hooks to see when a singular merge operation happens.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ public int getMaxDimensions(String fieldName) {
*/
public abstract KnnVectorsFormat getKnnVectorsFormatForField(String field);

/**
* Called after all vector fields have been merged for a segment merge operation. Subclasses can
* override this to release merge-time resources such as thread pools. The default implementation
* is a no-op.
*
* @throws IOException if an I/O error occurs during cleanup
*/
protected void afterMerge() throws IOException {}

private class FieldsWriter extends KnnVectorsWriter {
private final Map<KnnVectorsFormat, WriterAndSuffix> formats;
private final Map<String, Integer> suffixes = new HashMap<>();
Expand Down Expand Up @@ -135,6 +144,11 @@ public void finish() throws IOException {
}
}

@Override
protected void afterMerge() throws IOException {
PerFieldKnnVectorsFormat.this.afterMerge();
}

@Override
public void close() throws IOException {
IOUtils.close(formats.values());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.codecs.perfield;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.KnnFloatVectorField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;

/** Tests for the afterMerge() lifecycle hook on PerFieldKnnVectorsFormat. */
public class TestPerFieldKnnVectorsFormatAfterMerge extends LuceneTestCase {

/** Writes numSegments single-doc segments with a vector field, using NoMergePolicy. */
private void writeSegments(Directory dir, KnnVectorsFormat format, int numSegments)
throws IOException {
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setCodec(codecWithFormat(format));
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter iw = new IndexWriter(dir, iwc)) {
for (int i = 0; i < numSegments; i++) {
Document doc = new Document();
doc.add(new KnnFloatVectorField("field", new float[] {i, i + 1, i + 2}));
iw.addDocument(doc);
iw.commit();
}
}
}

private static FilterCodec codecWithFormat(KnnVectorsFormat format) {
Codec defaultCodec = TestUtil.getDefaultCodec();
return new FilterCodec(defaultCodec.getName(), defaultCodec) {
@Override
public KnnVectorsFormat knnVectorsFormat() {
return format;
}
};
}

/** afterMerge() on the format must be called exactly once when a merge completes. */
public void testAfterMergeCalledOnMerge() throws IOException {
AtomicInteger afterMergeCount = new AtomicInteger();

PerFieldKnnVectorsFormat format =
new PerFieldKnnVectorsFormat() {
@Override
public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
return TestUtil.getDefaultKnnVectorsFormat();
}

@Override
protected void afterMerge() {
afterMergeCount.incrementAndGet();
}
};

try (Directory dir = newDirectory()) {
writeSegments(dir, format, 3);
assertEquals("afterMerge should not be called during flush", 0, afterMergeCount.get());

// Force merge triggers afterMerge
IndexWriterConfig mergeConfig = new IndexWriterConfig(new MockAnalyzer(random()));
mergeConfig.setCodec(codecWithFormat(format));
try (IndexWriter iw = new IndexWriter(dir, mergeConfig)) {
iw.forceMerge(1);
}

assertEquals("afterMerge should be called exactly once per merge", 1, afterMergeCount.get());
}
}

/** afterMerge() must not be called during a normal flush (no merge). */
public void testAfterMergeNotCalledOnFlush() throws IOException {
AtomicInteger afterMergeCount = new AtomicInteger();

PerFieldKnnVectorsFormat format =
new PerFieldKnnVectorsFormat() {
@Override
public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
return TestUtil.getDefaultKnnVectorsFormat();
}

@Override
protected void afterMerge() {
afterMergeCount.incrementAndGet();
}
};

try (Directory dir = newDirectory()) {
writeSegments(dir, format, 3);
assertEquals("afterMerge must not be called on flush-only writes", 0, afterMergeCount.get());
}
}

/**
* afterMerge() is called in a finally block, so it must fire even when mergeOneField throws. We
* verify this by using a format that wraps the delegate writer to throw during merge, then
* checking that afterMerge() was still invoked.
*/
public void testAfterMergeCalledEvenOnException() throws IOException {
AtomicInteger afterMergeCount = new AtomicInteger();

PerFieldKnnVectorsFormat format =
new PerFieldKnnVectorsFormat() {
@Override
public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
return new ThrowingOnMergeKnnVectorsFormat(TestUtil.getDefaultKnnVectorsFormat());
}

@Override
protected void afterMerge() {
afterMergeCount.incrementAndGet();
}
};

try (Directory dir = newDirectory()) {
// Write segments using a normal format so data is valid on disk
PerFieldKnnVectorsFormat normalFormat =
new PerFieldKnnVectorsFormat() {
@Override
public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
return TestUtil.getDefaultKnnVectorsFormat();
}
};
writeSegments(dir, normalFormat, 3);

// Use SerialMergeScheduler so the merge runs on the calling thread and the
// exception propagates directly (instead of being caught by ConcurrentMergeScheduler)
IndexWriterConfig mergeConfig = new IndexWriterConfig(new MockAnalyzer(random()));
mergeConfig.setCodec(codecWithFormat(format));
mergeConfig.setMergeScheduler(new org.apache.lucene.index.SerialMergeScheduler());
IndexWriter iw = new IndexWriter(dir, mergeConfig);
try {
expectThrows(IOException.class, () -> iw.forceMerge(1));
} finally {
// IndexWriter may be in a tragic state after the merge failure, so rollback
try {
iw.rollback();
} catch (
@SuppressWarnings("unused")
Exception ignored) {
// expected — writer may already be closed or in a tragic state
}
}

assertEquals(
"afterMerge must be called even when mergeOneField throws", 1, afterMergeCount.get());
}
}

/**
* A KnnVectorsFormat wrapper that delegates everything normally except mergeOneField, which always
* throws IOException.
*/
private static class ThrowingOnMergeKnnVectorsFormat extends KnnVectorsFormat {
private final KnnVectorsFormat delegate;

ThrowingOnMergeKnnVectorsFormat(KnnVectorsFormat delegate) {
super(delegate.getName());
this.delegate = delegate;
}

@Override
public org.apache.lucene.codecs.KnnVectorsWriter fieldsWriter(
org.apache.lucene.index.SegmentWriteState state) throws IOException {
org.apache.lucene.codecs.KnnVectorsWriter delegateWriter = delegate.fieldsWriter(state);
return new org.apache.lucene.codecs.KnnVectorsWriter() {
@Override
public org.apache.lucene.codecs.KnnFieldVectorsWriter<?> addField(
org.apache.lucene.index.FieldInfo fieldInfo) throws IOException {
return delegateWriter.addField(fieldInfo);
}

@Override
public void flush(int maxDoc, org.apache.lucene.index.Sorter.DocMap sortMap)
throws IOException {
delegateWriter.flush(maxDoc, sortMap);
}

@Override
public void mergeOneField(
org.apache.lucene.index.FieldInfo fieldInfo,
org.apache.lucene.index.MergeState mergeState)
throws IOException {
throw new IOException("simulated merge failure");
}

@Override
public void finish() throws IOException {
delegateWriter.finish();
}

@Override
public void close() throws IOException {
delegateWriter.close();
}

@Override
public long ramBytesUsed() {
return delegateWriter.ramBytesUsed();
}
};
}

@Override
public org.apache.lucene.codecs.KnnVectorsReader fieldsReader(
org.apache.lucene.index.SegmentReadState state) throws IOException {
return delegate.fieldsReader(state);
}

@Override
public int getMaxDimensions(String fieldName) {
return delegate.getMaxDimensions(fieldName);
}
}
}
Loading