diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java index 8e2a9edd0..12c405550 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java @@ -60,15 +60,18 @@ public class FileSpout extends BaseRichSpout { public static final int BATCH_SIZE = 10000; public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class); - private final Queue inputFiles; - protected SpoutOutputCollector collector; + private transient Queue inputFiles; + private final String seedDir; + private final String fileFilter; + private final String[] seedFiles; + protected transient SpoutOutputCollector collector; protected Scheme scheme = new StringTabScheme(); protected LinkedList buffer = new LinkedList<>(); protected boolean active; - protected int totalTasks; - protected int taskIndex; + protected transient int totalTasks; + protected transient int taskIndex; private BufferedReader currentBuffer; - private boolean withDiscoveredStatus = false; + private final boolean withDiscoveredStatus; /** * @param dir containing the seed files @@ -94,18 +97,9 @@ public FileSpout(String... files) { */ public FileSpout(String dir, String filter, boolean withDiscoveredStatus) { this.withDiscoveredStatus = withDiscoveredStatus; - Path pdir = Paths.get(dir); - inputFiles = new LinkedList<>(); - LOG.info("Reading directory: {} (filter: {})", pdir, filter); - try (DirectoryStream stream = Files.newDirectoryStream(pdir, filter)) { - for (Path entry : stream) { - String inputFile = entry.toAbsolutePath().toString(); - inputFiles.add(inputFile); - LOG.info("Input : {}", inputFile); - } - } catch (IOException ioe) { - LOG.error("IOException: %s%n", ioe); - } + this.seedDir = dir; + this.fileFilter = filter; + this.seedFiles = null; } /** @@ -115,12 +109,13 @@ public FileSpout(String dir, String filter, boolean withDiscoveredStatus) { * @since 1.13 */ public FileSpout(boolean withDiscoveredStatus, String... files) { - this.withDiscoveredStatus = withDiscoveredStatus; if (files.length == 0) { throw new IllegalArgumentException("Must configure at least one inputFile"); } - inputFiles = new LinkedList<>(); - Collections.addAll(inputFiles, files); + this.withDiscoveredStatus = withDiscoveredStatus; + this.seedDir = null; + this.fileFilter = null; + this.seedFiles = files; } /** @@ -195,6 +190,35 @@ public void open( // same as the number of shards totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); taskIndex = context.getThisTaskIndex(); + + // Resolve the seeds here, not in the constructor: in distributed mode the + // spout is serialised on the submit client and only open() runs on the + // workers, where the seed directory/files actually live (issue #1955). + inputFiles = new LinkedList<>(); + populateInputFiles(); + } + + /** + * Resolves the configured seed directory or file list into {@link #inputFiles}. Called from + * {@link #open} so the filesystem is read on the worker, not on the client at construction + * time. + */ + private void populateInputFiles() { + if (seedDir != null) { + Path pdir = Paths.get(seedDir); + LOG.info("Reading directory: {} (filter: {})", pdir, fileFilter); + try (DirectoryStream stream = Files.newDirectoryStream(pdir, fileFilter)) { + for (Path entry : stream) { + String inputFile = entry.toAbsolutePath().toString(); + inputFiles.add(inputFile); + LOG.info("Input : {}", inputFile); + } + } catch (IOException ioe) { + LOG.error("IOException while reading seed directory {}", pdir, ioe); + } + } else { + Collections.addAll(inputFiles, seedFiles); + } } @Override diff --git a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java index e0447b281..d8770fe88 100644 --- a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java +++ b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java @@ -159,6 +159,77 @@ void testBzip2SeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Ex assertEquals("https://github.com", tuple.get(0)); } + @Test + void testDirectoryResolvedAtOpenNotConstruction(@org.junit.jupiter.api.io.TempDir Path tempDir) + throws Exception { + // Build the spout while the directory is still empty. + final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt"); + + // Create the seed file AFTER construction but BEFORE open(). + // On today's code the constructor already listed the (empty) directory, + // so nothing is emitted. Once resolution moves to open(), the file is seen. + Files.write( + tempDir.resolve("seeds.txt"), + "https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + spout.nextTuple(); + + final List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + } + + @Test + void testDirectoryConstructorHappyPath(@org.junit.jupiter.api.io.TempDir Path tempDir) + throws Exception { + Files.write( + tempDir.resolve("seeds.txt"), + "https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + + final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt"); + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + spout.nextTuple(); + + final List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + } + + @Test + void testSurvivesSerialization(@org.junit.jupiter.api.io.TempDir Path tempDir) + throws Exception { + Files.write( + tempDir.resolve("seeds.txt"), + "https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + + final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt"); + + final java.io.ByteArrayOutputStream bos = new java.io.ByteArrayOutputStream(); + try (java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(bos)) { + oos.writeObject(spout); + } + final FileSpout restored; + try (java.io.ObjectInputStream ois = + new java.io.ObjectInputStream( + new java.io.ByteArrayInputStream(bos.toByteArray()))) { + restored = (FileSpout) ois.readObject(); + } + + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + restored.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + restored.activate(); + restored.nextTuple(); + + final List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + } + private Path getPath(String resource) throws URISyntaxException { return Path.of( Objects.requireNonNull(