Я пытаюсь вывести канал в разные каталоги, чтобы вывод каждого каталога был разделен на основе некоторых идентификаторов. Итак, в простом коде сокращения карты я бы использовал класс MultipleOutputs и сделал бы что-то подобное в редукторе.
protected void reduce(final SomeKey key,
final Iterable<SomeValue> values,
final Context context) {
...
for (SomeValue value: values) {
String bucketId = computeBucketIdFrom(...);
multipleOutputs.write(key, value, folderName + "/" + bucketId);
...
Так что я думаю, можно было бы сделать это так в ошпаривании
...
val somePipe = Csv(in, separator = "\t",
fields = someSchema,
skipHeader = true)
.read
for (i <- 1 until numberOfBuckets) {
somePipe
.filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
.write(Csv(out + "/bucket" + i ,
writeHeader = true,
separator = "\t"))
}
Но я чувствую, что вы в конечном итоге будете перекрашивать одну и ту же трубу много раз, и это повлияет на общую производительность.
Есть ли другие альтернативы?
Спасибо