Как загрузить файл CSV в векторы Apache Arrow и сохранить файл стрелки на диск

В настоящее время я играю с java API Apache Arrow (хотя я использую его из Scala для примеров кода ), чтобы ознакомиться с этим инструментом.

В качестве упражнения я решил загрузить файл CSV в векторы стрелок, а затем сохранить их в файл стрелок. Первая часть показалась мне достаточно простой, и я попробовал вот так:

val csvLines: Stream[Array[String]] = <open stream from CSV parser>

// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)

// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)

// Work on the data, zipping it with its index 
Stream.from(0)
  .zip(csvLines.tail) // Work on the tail (head contains the headers)
  .foreach(rowTup =>  // rowTup = (index, csvRow as an Array[String])
    Range(0, rowTup._2.size) // Iterate on each column...
      .foreach(columnNumber =>
        writeToMutator(
          mutators(columnNumber), // get that column's mutator
          idx=rowTup._1,          // pass the current row number
          data=rowTup._2(columnNumber) // pass the entry of the curernt column
        )
      )
)

С initVectors() и writeToMutator(), определенными как:

def initVectors(
  columns: Array[String], 
  alloc: RootAllocator): Array[NullableVarCharVector] = {

  // Initialize a vector for each column
  val vectors = columns.map(colName => 
    new NullableVarCharVector(colName, alloc))
  // 4096 size, for 1024 values initially. This is arbitrary
  vectors.foreach(_.allocateNew(2^12,1024))
  vectors
}

def writeToMutator(
  mutator: NullableVarCharVector#Mutator, 
  idx: Int, 
  data: String): Unit = {

  // The CSV may contain null values
  if (data != null) {
    val bytes = data.getBytes()
    mutator.setSafe(idx, bytes, 0, bytes.length)
  }
  mutator.setNull(idx)
}

(в настоящее время я не забочусь об использовании правильного типа и сохраняю все как строки или VarChar в терминах стрелок)

Итак, на данный момент у меня есть коллекция NullableVarCharVector, и я могу читать и писать из них / в них. На данный момент все отлично. Теперь, что касается следующего шага, мне осталось только задуматься о том, как на самом деле объединить их вместе и сериализовать в файл со стрелками. Я наткнулся на AbstractFieldWriter абстрактный класс, но неясно, как использовать его реализации.

Итак, вопрос в основном таков:

  • какой (лучший? - кажется, есть несколько) способ сохранить кучу векторов в файл со стрелками.
  • есть ли другие способы загрузки столбцов CSV в векторы стрелок?

отредактировано для добавления: страница описания метаданных дает хороший общий обзор по этой теме.

Тестовые классы api, похоже, содержат несколько вещей, которые могут помочь, я отправлю ответ с образцом, как только попробую.


person Shastick    schedule 23.10.2017    source источник
comment
Наполовину связанное, но тем не менее интересное чтение: mapr.com/blog/apache-arrow -and-value-векторы   -  person Shastick    schedule 23.10.2017


Ответы (1)


Глядя на TestArrowFile.java и BaseFileTest.java Я обнаружил:

  • Как записать на диск файл с одной стрелкой
  • Альтернативный способ заполнения векторов, поскольку моя первая попытка помешала мне собрать один файл со стрелкой (или, по крайней мере, сделать это простым способом).

Итак, заполнение векторов теперь выглядит так:

// Open stream of rows 
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// Define a parent to hold the vectors
val parent = MapVector.empty("parent", allocator)
// Create a new writer. VarCharWriterImpl would probably do as well?
val writer = new ComplexWriterImpl("root", parent)

// Initialise a writer for each column, using the header as the name
val rootWriter = writer.rootAsMap()
val writers = csvLines.head.map(colName => 
                                  rootWriter.varChar(colName))

Stream.from(0)
  .zip(csvLines.tail) // Zip the rows with their index
  .foreach( rowTup => { // Iterate on each (index, row) tuple
    val (idx, row) = rowTup
      Range(0, row.size) // Iterate on each field of the row
        .foreach(column =>
          Option(row(column)) // row(column) may be null,
            .foreach(str =>   // use the option as a null check
              write(writers(column), idx, allocator, str)
            )
      )
  }
)

toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file

где write определяется как:

def write(writer: VarCharWriter, idx: Int, 
  allocator: BufferAllocator, data: String): Unit = {
  // Set the position to the correct index
  writer.setPosition(idx)
  val bytes = data.getBytes()
  // Apparently the allocator is required again to build a new buffer
  val varchar = allocator.buffer(bytes.length)
  varchar.setBytes(0, data.getBytes())
  writer.writeVarChar(0, bytes.length, varchar)
}

def toFile(parent: FieldVector, fName: String): Unit = {
  // Extract a schema from the parent: that's the part I struggled with in the original question
  val rootSchema = new VectorSchemaRoot(parent)
  val stream = new FileOutputStream(fName)
  val fileWriter = new ArrowFileWriter(
                        rootSchema,
                        null, // We don't use dictionary encoding.
                        stream.getChannel)
  // Write everything to file...
  fileWriter.start()
  fileWriter.writeBatch()
  fileWriter.end()
  stream.close()
}

С помощью вышесказанного я могу сохранить CSV в файл. Я проверил, что все прошло хорошо, прочитав его и снова конвертировав в CSV, и содержимое не изменилось.

Обратите внимание, что ComplexWriterImpl позволяет записывать столбцы разных типов, что пригодится, чтобы избежать хранения числовых столбцов в виде строк.

(пока я играю с читающей стороной, эти вещи, вероятно, заслуживают своих собственных SO-вопросов.)

person Shastick    schedule 23.10.2017
comment
ссылки на github мертвы - person Fabich; 12.12.2018
comment
Спасибо. Должно быть исправлено. - person Shastick; 14.12.2018