Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions walk/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package walk

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
Expand All @@ -15,14 +16,19 @@ import (
"golang.org/x/sync/errgroup"
)

// errWalkClosed is used internally to abort filepath.Walk when Close() is
// called while process() is still producing.
var errWalkClosed = errors.New("filesystem reader closed")

// FilesystemReader traverses and reads files from a specified root directory and its subdirectories.
type FilesystemReader struct {
log *log.Logger
root string
path string
batchSize int

eg *errgroup.Group
eg *errgroup.Group
done chan struct{}

stats *stats.Stats
filesCh chan *File
Expand Down Expand Up @@ -69,13 +75,19 @@ func (f *FilesystemReader) process() error {
Info: info,
}

f.filesCh <- &file
select {
case f.filesCh <- &file:
case <-f.done:
return errWalkClosed
}

f.log.Debugf("file queued %s", file.RelPath)

return nil
})
if err != nil {
if errors.Is(err, errWalkClosed) {
return nil
} else if err != nil {
return fmt.Errorf("failed to walk path %s: %w", path, err)
}

Expand Down Expand Up @@ -118,6 +130,9 @@ LOOP:

// Close waits for all filesystem processing to complete.
func (f *FilesystemReader) Close() error {
// Unblock process() in case the caller stopped draining Read() before EOF.
close(f.done)

err := f.eg.Wait()
if err != nil {
return fmt.Errorf("failed to wait for processing to complete: %w", err)
Expand All @@ -143,7 +158,8 @@ func NewFilesystemReader(
path: path,
batchSize: batchSize,

eg: &eg,
eg: &eg,
done: make(chan struct{}),

stats: statz,
filesCh: make(chan *File, batchSize*runtime.NumCPU()),
Expand Down
33 changes: 33 additions & 0 deletions walk/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package walk_test
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -101,3 +105,32 @@ func TestFilesystemReader(t *testing.T) {
as.Equal(0, statz.Value(stats.Formatted))
as.Equal(0, statz.Value(stats.Changed))
}

// TestFilesystemReaderCloseUnblocks verifies that Close() returns even when
// the caller stops draining Read() before EOF.
func TestFilesystemReaderCloseUnblocks(t *testing.T) {
as := require.New(t)

tempDir := t.TempDir()

// Enough files to overflow the reader's internal channel so process()
// would block on send if Close() did not unblock it.
n := walk.BatchSize*runtime.GOMAXPROCS(0) + 100
for i := range n {
as.NoError(os.WriteFile(filepath.Join(tempDir, fmt.Sprintf("f%05d", i)), nil, 0o600))
}

statz := stats.New()
reader := walk.NewFilesystemReader(tempDir, "", &statz, walk.BatchSize)

done := make(chan error, 1)

go func() { done <- reader.Close() }()

select {
case err := <-done:
as.NoError(err)
case <-time.After(5 * time.Second):
t.Fatal("Close() did not return; process() is deadlocked")
}
}
Loading