Во-первых, рассмотрим этот класс CustomWriter:
public final class CustomWriter {
private final SequenceFile.Writer writer;
CustomWriter(Configuration configuration, Path outputPath) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
writer = SequenceFile.createWriter(configuration,
SequenceFile.Writer.file(outputPath),
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(ItemWritable.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()),
SequenceFile.Writer.blockSize(1024 * 1024),
SequenceFile.Writer.bufferSize(fileSystem.getConf().getInt("io.file.buffer.size", 4 * 1024)),
SequenceFile.Writer.replication(fileSystem.getDefaultReplication(outputPath)),
SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
}
public void close() throws IOException {
writer.close();
}
public void write(Item item) throws IOException {
writer.append(new LongWritable(item.getId()), new ItemWritable(item));
}
}
Я пытаюсь использовать асинхронный поток объектов типа Item
. У потребителя есть ссылка на экземпляр CustomWriter
. Затем он вызывает метод CustomWriter#write
для каждого получаемого элемента. Когда поток заканчивается, вызывается метод CustomWriter#close
, чтобы закрыть модуль записи.
Как видите, я создал только один модуль записи, и он начинает добавляться к совершенно новому файлу. Таким образом, нет никаких сомнений в том, что это не является причиной.
Я также должен отметить, что в настоящее время я запускаю это в среде модульного тестирования, используя MiniDFSCluster
в соответствии с инструкциями здесь< /а>. Если я запускаю это в среде без модульного тестирования (т.е. без MiniDFSCluster
), кажется, что все работает нормально.
Когда я пытаюсь прочитать файл обратно, все, что я вижу, это последний записанный объект Item
N раз (где N — общее количество элементов, полученных в потоке). Вот пример:
sparkContext.hadoopFile(path, SequenceFileInputFormat.class, LongWritable.class, ItemWritable.class)
.collect()
.forEach(new BiConsumer<>() {
@Override
public void accept(Tuple2<LongWritable, ItemWritable> tuple) {
LongWritable id = tuple._1();
ItemWritable item = tuple._2();
System.out.print(id.get() + " -> " + item.get());
}
});
Это напечатает что-то вроде этого:
...
1234 -> Item[...]
1234 -> Item[...]
1234 -> Item[...]
...
Я что-то делаю не так или это побочный эффект использования MiniDFSCluster
?