From b85db00ce47fa36cfc0314df1d6ecd405c46012c Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Wed, 17 Jun 2026 09:01:31 +0200 Subject: [PATCH 1/3] fix: resolve FileSpout seeds in open() for distributed mode --- .../apache/stormcrawler/spout/FileSpout.java | 57 +++++++++++++------ .../stormcrawler/spout/FileSpoutTest.java | 23 ++++++++ 2 files changed, 64 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java index 8e2a9edd0..4e783d2ea 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java @@ -61,6 +61,9 @@ public class FileSpout extends BaseRichSpout { public static final int BATCH_SIZE = 10000; public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class); private final Queue inputFiles; + private final String seedDir; + private final String fileFilter; + private final String[] seedFiles; protected SpoutOutputCollector collector; protected Scheme scheme = new StringTabScheme(); protected LinkedList buffer = new LinkedList<>(); @@ -68,7 +71,7 @@ public class FileSpout extends BaseRichSpout { protected int totalTasks; protected int taskIndex; private BufferedReader currentBuffer; - private boolean withDiscoveredStatus = false; + private final boolean withDiscoveredStatus; /** * @param dir containing the seed files @@ -94,18 +97,10 @@ public FileSpout(String... files) { */ public FileSpout(String dir, String filter, boolean withDiscoveredStatus) { this.withDiscoveredStatus = withDiscoveredStatus; - Path pdir = Paths.get(dir); - inputFiles = new LinkedList<>(); - LOG.info("Reading directory: {} (filter: {})", pdir, filter); - try (DirectoryStream stream = Files.newDirectoryStream(pdir, filter)) { - for (Path entry : stream) { - String inputFile = entry.toAbsolutePath().toString(); - inputFiles.add(inputFile); - LOG.info("Input : {}", inputFile); - } - } catch (IOException ioe) { - LOG.error("IOException: %s%n", ioe); - } + this.seedDir = dir; + this.fileFilter = filter; + this.seedFiles = null; + this.inputFiles = new LinkedList<>(); } /** @@ -115,12 +110,14 @@ public FileSpout(String dir, String filter, boolean withDiscoveredStatus) { * @since 1.13 */ public FileSpout(boolean withDiscoveredStatus, String... files) { - this.withDiscoveredStatus = withDiscoveredStatus; if (files.length == 0) { throw new IllegalArgumentException("Must configure at least one inputFile"); } - inputFiles = new LinkedList<>(); - Collections.addAll(inputFiles, files); + this.withDiscoveredStatus = withDiscoveredStatus; + this.seedDir = null; + this.fileFilter = null; + this.seedFiles = files; + this.inputFiles = new LinkedList<>(); } /** @@ -195,6 +192,34 @@ public void open( // same as the number of shards totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); taskIndex = context.getThisTaskIndex(); + + // Resolve the seeds here, not in the constructor: in distributed mode the + // spout is serialised on the submit client and only open() runs on the + // workers, where the seed directory/files actually live (issue #1955). + populateInputFiles(); + } + + /** + * Resolves the configured seed directory or file list into {@link #inputFiles}. Called from + * {@link #open} so the filesystem is read on the worker, not on the client at construction + * time. + */ + private void populateInputFiles() { + if (seedDir != null) { + Path pdir = Paths.get(seedDir); + LOG.info("Reading directory: {} (filter: {})", pdir, fileFilter); + try (DirectoryStream stream = Files.newDirectoryStream(pdir, fileFilter)) { + for (Path entry : stream) { + String inputFile = entry.toAbsolutePath().toString(); + inputFiles.add(inputFile); + LOG.info("Input : {}", inputFile); + } + } catch (IOException ioe) { + LOG.error("IOException while reading seed directory {}", pdir, ioe); + } + } else { + Collections.addAll(inputFiles, seedFiles); + } } @Override diff --git a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java index e0447b281..ac3eea661 100644 --- a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java +++ b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java @@ -159,6 +159,29 @@ void testBzip2SeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Ex assertEquals("https://github.com", tuple.get(0)); } + @Test + void testDirectoryResolvedAtOpenNotConstruction( + @org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception { + // Build the spout while the directory is still empty. + final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt"); + + // Create the seed file AFTER construction but BEFORE open(). + // On today's code the constructor already listed the (empty) directory, + // so nothing is emitted. Once resolution moves to open(), the file is seen. + Files.write( + tempDir.resolve("seeds.txt"), + "https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + spout.nextTuple(); + + final List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + } + private Path getPath(String resource) throws URISyntaxException { return Path.of( Objects.requireNonNull( From cfd97dc1b41f20f22103d970cc6853932961e193 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Wed, 17 Jun 2026 09:04:14 +0200 Subject: [PATCH 2/3] test: cover FileSpout directory resolution and serialization --- .../stormcrawler/spout/FileSpoutTest.java | 52 ++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java index ac3eea661..d8770fe88 100644 --- a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java +++ b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java @@ -160,8 +160,8 @@ void testBzip2SeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Ex } @Test - void testDirectoryResolvedAtOpenNotConstruction( - @org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception { + void testDirectoryResolvedAtOpenNotConstruction(@org.junit.jupiter.api.io.TempDir Path tempDir) + throws Exception { // Build the spout while the directory is still empty. final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt"); @@ -182,6 +182,54 @@ void testDirectoryResolvedAtOpenNotConstruction( assertEquals("https://stormcrawler.apache.org", tuple.get(0)); } + @Test + void testDirectoryConstructorHappyPath(@org.junit.jupiter.api.io.TempDir Path tempDir) + throws Exception { + Files.write( + tempDir.resolve("seeds.txt"), + "https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + + final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt"); + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + spout.nextTuple(); + + final List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + } + + @Test + void testSurvivesSerialization(@org.junit.jupiter.api.io.TempDir Path tempDir) + throws Exception { + Files.write( + tempDir.resolve("seeds.txt"), + "https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + + final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt"); + + final java.io.ByteArrayOutputStream bos = new java.io.ByteArrayOutputStream(); + try (java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(bos)) { + oos.writeObject(spout); + } + final FileSpout restored; + try (java.io.ObjectInputStream ois = + new java.io.ObjectInputStream( + new java.io.ByteArrayInputStream(bos.toByteArray()))) { + restored = (FileSpout) ois.readObject(); + } + + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + restored.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + restored.activate(); + restored.nextTuple(); + + final List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + } + private Path getPath(String resource) throws URISyntaxException { return Path.of( Objects.requireNonNull( From 20665874b3b59ecb3fb826d1348a3dd02259fa08 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Thu, 18 Jun 2026 16:21:28 +0200 Subject: [PATCH 3/3] Addressed reviewer comments --- .../java/org/apache/stormcrawler/spout/FileSpout.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java index 4e783d2ea..12c405550 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java @@ -60,16 +60,16 @@ public class FileSpout extends BaseRichSpout { public static final int BATCH_SIZE = 10000; public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class); - private final Queue inputFiles; + private transient Queue inputFiles; private final String seedDir; private final String fileFilter; private final String[] seedFiles; - protected SpoutOutputCollector collector; + protected transient SpoutOutputCollector collector; protected Scheme scheme = new StringTabScheme(); protected LinkedList buffer = new LinkedList<>(); protected boolean active; - protected int totalTasks; - protected int taskIndex; + protected transient int totalTasks; + protected transient int taskIndex; private BufferedReader currentBuffer; private final boolean withDiscoveredStatus; @@ -100,7 +100,6 @@ public FileSpout(String dir, String filter, boolean withDiscoveredStatus) { this.seedDir = dir; this.fileFilter = filter; this.seedFiles = null; - this.inputFiles = new LinkedList<>(); } /** @@ -117,7 +116,6 @@ public FileSpout(boolean withDiscoveredStatus, String... files) { this.seedDir = null; this.fileFilter = null; this.seedFiles = files; - this.inputFiles = new LinkedList<>(); } /** @@ -196,6 +194,7 @@ public void open( // Resolve the seeds here, not in the constructor: in distributed mode the // spout is serialised on the submit client and only open() runs on the // workers, where the seed directory/files actually live (issue #1955). + inputFiles = new LinkedList<>(); populateInputFiles(); }