Как группировать результаты в Scalding

Я пытаюсь вывести канал в разные каталоги, чтобы вывод каждого каталога был разделен на основе некоторых идентификаторов. Итак, в простом коде сокращения карты я бы использовал класс MultipleOutputs и сделал бы что-то подобное в редукторе.

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

Так что я думаю, можно было бы сделать это так в ошпаривании

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

Но я чувствую, что вы в конечном итоге будете перекрашивать одну и ту же трубу много раз, и это повлияет на общую производительность.

Есть ли другие альтернативы?

Спасибо


person jeremie    schedule 06.02.2015    source источник


Ответы (1)


Да, конечно, лучше использовать TemplatedTsv. .

Таким образом, ваш код выше можно записать следующим образом:

val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
    .read
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))

Это поместит все записи, поступающие от 'some_id, в отдельные папки в папке out/some_ids.

Однако вы также можете создавать целочисленные сегменты. Просто измените последние строки,

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))

Это создаст папки с двумя цифрами как out/dd/. Вы также можете проверить API templatedTsv здесь

Может быть небольшая проблема с использованием templatedTsv, то есть редукторы могут генерировать много маленьких файлов, которые могут быть плохими для следующей работы с использованием ваших результатов. Поэтому лучше сортировать по полям шаблона перед записью на диск. Я написал об этом в блоге здесь.

person morazow    schedule 13.02.2015
comment
Можете ли вы помочь ответить на связанный с Scalding вопрос, размещенный здесь - stackoverflow.com/questions/28687539/ - person Ambarish Hazarnis; 24.02.2015
comment
да, я ответил на этот вопрос stackoverflow.com/a/28714754/2908547. надеюсь, что это полезно, лучше всего - person morazow; 25.02.2015