Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ public class FileSpout extends BaseRichSpout {

public static final int BATCH_SIZE = 10000;
public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class);
private final Queue<String> inputFiles;
protected SpoutOutputCollector collector;
private transient Queue<String> inputFiles;
private final String seedDir;
private final String fileFilter;
private final String[] seedFiles;
protected transient SpoutOutputCollector collector;
protected Scheme scheme = new StringTabScheme();
protected LinkedList<byte[]> buffer = new LinkedList<>();
protected boolean active;
protected int totalTasks;
protected int taskIndex;
protected transient int totalTasks;
protected transient int taskIndex;
private BufferedReader currentBuffer;
private boolean withDiscoveredStatus = false;
private final boolean withDiscoveredStatus;

/**
* @param dir containing the seed files
Expand All @@ -94,18 +97,9 @@ public FileSpout(String... files) {
*/
public FileSpout(String dir, String filter, boolean withDiscoveredStatus) {
this.withDiscoveredStatus = withDiscoveredStatus;
Path pdir = Paths.get(dir);
inputFiles = new LinkedList<>();
LOG.info("Reading directory: {} (filter: {})", pdir, filter);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(pdir, filter)) {
for (Path entry : stream) {
String inputFile = entry.toAbsolutePath().toString();
inputFiles.add(inputFile);
LOG.info("Input : {}", inputFile);
}
} catch (IOException ioe) {
LOG.error("IOException: %s%n", ioe);
}
this.seedDir = dir;
this.fileFilter = filter;
this.seedFiles = null;
}

/**
Expand All @@ -115,12 +109,13 @@ public FileSpout(String dir, String filter, boolean withDiscoveredStatus) {
* @since 1.13
*/
public FileSpout(boolean withDiscoveredStatus, String... files) {
this.withDiscoveredStatus = withDiscoveredStatus;
if (files.length == 0) {
throw new IllegalArgumentException("Must configure at least one inputFile");
}
inputFiles = new LinkedList<>();
Collections.addAll(inputFiles, files);
this.withDiscoveredStatus = withDiscoveredStatus;
this.seedDir = null;
this.fileFilter = null;
this.seedFiles = files;
}

/**
Expand Down Expand Up @@ -195,6 +190,35 @@ public void open(
// same as the number of shards
totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
taskIndex = context.getThisTaskIndex();

// Resolve the seeds here, not in the constructor: in distributed mode the
// spout is serialised on the submit client and only open() runs on the
// workers, where the seed directory/files actually live (issue #1955).
inputFiles = new LinkedList<>();
populateInputFiles();
}

/**
* Resolves the configured seed directory or file list into {@link #inputFiles}. Called from
* {@link #open} so the filesystem is read on the worker, not on the client at construction
* time.
*/
private void populateInputFiles() {
if (seedDir != null) {
Path pdir = Paths.get(seedDir);
LOG.info("Reading directory: {} (filter: {})", pdir, fileFilter);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(pdir, fileFilter)) {
for (Path entry : stream) {
String inputFile = entry.toAbsolutePath().toString();
inputFiles.add(inputFile);
LOG.info("Input : {}", inputFile);
}
} catch (IOException ioe) {
LOG.error("IOException while reading seed directory {}", pdir, ioe);
}
} else {
Collections.addAll(inputFiles, seedFiles);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,77 @@ void testBzip2SeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Ex
assertEquals("https://github.com", tuple.get(0));
}

@Test
void testDirectoryResolvedAtOpenNotConstruction(@org.junit.jupiter.api.io.TempDir Path tempDir)
throws Exception {
// Build the spout while the directory is still empty.
final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt");

// Create the seed file AFTER construction but BEFORE open().
// On today's code the constructor already listed the (empty) directory,
// so nothing is emitted. Once resolution moves to open(), the file is seen.
Files.write(
tempDir.resolve("seeds.txt"),
"https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8));

final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock();
spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock);
spout.activate();
spout.nextTuple();

final List<Object> tuple = collectorMock.getTuple();
assertNotNull(tuple);
assertEquals("https://stormcrawler.apache.org", tuple.get(0));
}

@Test
void testDirectoryConstructorHappyPath(@org.junit.jupiter.api.io.TempDir Path tempDir)
throws Exception {
Files.write(
tempDir.resolve("seeds.txt"),
"https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8));

final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt");
final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock();
spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock);
spout.activate();
spout.nextTuple();

final List<Object> tuple = collectorMock.getTuple();
assertNotNull(tuple);
assertEquals("https://stormcrawler.apache.org", tuple.get(0));
}

@Test
void testSurvivesSerialization(@org.junit.jupiter.api.io.TempDir Path tempDir)
throws Exception {
Files.write(
tempDir.resolve("seeds.txt"),
"https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8));

final FileSpout spout = new FileSpout(tempDir.toAbsolutePath().toString(), "*.txt");

final java.io.ByteArrayOutputStream bos = new java.io.ByteArrayOutputStream();
try (java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(bos)) {
oos.writeObject(spout);
}
final FileSpout restored;
try (java.io.ObjectInputStream ois =
new java.io.ObjectInputStream(
new java.io.ByteArrayInputStream(bos.toByteArray()))) {
restored = (FileSpout) ois.readObject();
}

final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock();
restored.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock);
restored.activate();
restored.nextTuple();

final List<Object> tuple = collectorMock.getTuple();
assertNotNull(tuple);
assertEquals("https://stormcrawler.apache.org", tuple.get(0));
}

private Path getPath(String resource) throws URISyntaxException {
return Path.of(
Objects.requireNonNull(
Expand Down
Loading