Scalding чтение нескольких файлов из HDFS

Как я могу прочитать все файлы из каталога на HDFS и обработать их с помощью scalding. Для локальной файловой системы я использую ниже

import com.twitter.scalding._
import com.twitter.scalding.JsonLine
import java.io._

class ParseJsonJob(args: Args) extends Job(args) {
  val fileList = new File(args("input")).listFiles
  val fields = ('device_guid
                ,'service_name
                ,'event_type
               )

  fileList.map {
    fileName =>
      JsonLine(fileName.toString, fields)
      .read
      .filter ('service_name) { name: String => name == "myservice" }
      .write(Tsv(args("output") + fileName.toString.split("/").last))
  }
}

Это не будет работать с HDFS. Читают ли TextLine или JsonLine каталоги в дополнение к файлам?


person Yash Ranadive    schedule 20.02.2015    source источник


Ответы (2)


Вы должны получить файловую систему Hadoop и использовать FileSystem.liststatus для сканирования каталога HDFS, например:

...
val hadoopConf= implicitly[Mode] match {
  case Hdfs(_, conf) => conf
}
val fs= FileSystem.get(hadoopConf)
for(status <- fs.listStatus(new Path(args("input")))) {
  JsonLine(status.getPath.toString.toString, fields)
      .read
      .filter ('service_name) { name: String => name == "myservice" }
      .write(Tsv(args("output") + fileName.toString.split("/").last))
 }
person Sasha O    schedule 22.02.2015

импортировать com.twitter.scalding._
импортировать com.twitter.scalding.JsonLine импортировать java.io._

класс ParseJsonJob (аргументы: аргументы) расширяет задание (аргументы) { val fields = ('device_guid,'service_name,'event_type)

    JsonLine(args("input"), fields)
    .read
    .filter ('service_name) { name: String => name == "myservice" }
    .write(Tsv(args("output") )   } }

Это сработает для вас. Дай мне знать, если это не так.

person Sanchit Grover    schedule 13.03.2015