Почему операция добавления модуля записи SequenceFile перезаписывает все значения последним значением?

Во-первых, рассмотрим этот класс CustomWriter:

public final class CustomWriter {

  private final SequenceFile.Writer writer;

  CustomWriter(Configuration configuration, Path outputPath) throws IOException {
    FileSystem fileSystem = FileSystem.get(configuration);
    if (fileSystem.exists(outputPath)) {
      fileSystem.delete(outputPath, true);
    }

    writer = SequenceFile.createWriter(configuration,
        SequenceFile.Writer.file(outputPath),
        SequenceFile.Writer.keyClass(LongWritable.class),
        SequenceFile.Writer.valueClass(ItemWritable.class),
        SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()),
        SequenceFile.Writer.blockSize(1024 * 1024),
        SequenceFile.Writer.bufferSize(fileSystem.getConf().getInt("io.file.buffer.size", 4 * 1024)),
        SequenceFile.Writer.replication(fileSystem.getDefaultReplication(outputPath)),
        SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
  }

  public void close() throws IOException {
    writer.close();
  }

  public void write(Item item) throws IOException {
    writer.append(new LongWritable(item.getId()), new ItemWritable(item));
  }
}

Я пытаюсь использовать асинхронный поток объектов типа Item. У потребителя есть ссылка на экземпляр CustomWriter. Затем он вызывает метод CustomWriter#write для каждого получаемого элемента. Когда поток заканчивается, вызывается метод CustomWriter#close, чтобы закрыть модуль записи.

Как видите, я создал только один модуль записи, и он начинает добавляться к совершенно новому файлу. Таким образом, нет никаких сомнений в том, что это не является причиной.

Я также должен отметить, что в настоящее время я запускаю это в среде модульного тестирования, используя MiniDFSCluster в соответствии с инструкциями здесь< /а>. Если я запускаю это в среде без модульного тестирования (т.е. без MiniDFSCluster), кажется, что все работает нормально.

Когда я пытаюсь прочитать файл обратно, все, что я вижу, это последний записанный объект Item N раз (где N — общее количество элементов, полученных в потоке). Вот пример:

sparkContext.hadoopFile(path, SequenceFileInputFormat.class, LongWritable.class, ItemWritable.class)
    .collect()
    .forEach(new BiConsumer<>() {
      @Override
      public void accept(Tuple2<LongWritable, ItemWritable> tuple) {
        LongWritable id = tuple._1();
        ItemWritable item = tuple._2();
        System.out.print(id.get() + " -> " + item.get());
      }
    });

Это напечатает что-то вроде этого:

...
1234 -> Item[...]
1234 -> Item[...]
1234 -> Item[...]
...

Я что-то делаю не так или это побочный эффект использования MiniDFSCluster?


person nuaavee    schedule 08.01.2016    source источник


Ответы (1)


Writable (например, LongWritable, ItemWritable) повторно используется во время обработки данных. При получении записи Writable обычно просто заменяет ее содержимое, и вы просто получите тот же объект Writable. Вы должны скопировать их в новый объект, если хотите собрать их в массив.

person zsxwing    schedule 08.01.2016