HBase Get/Scan в задании Scalding

Я использую Scalding с Spyglass для чтения/записи в HBase.

Я делаю левое внешнее соединение таблицы1 и таблицы2 и пишу обратно в таблицу1 после преобразования столбца. И table1, и table2 объявлены как Spyglass HBaseSource.

Это прекрасно работает. Но мне нужно получить доступ к другой строке в таблице 1, используя rowkey для вычисления преобразованного значения.

Я попробовал следующее для получения HBase: val hTable = new HTable(conf, TABLE_NAME) val result = hTable.get(new Get(rowKey.getBytes()))

Я получаю доступ к заданию «Конфигурация в Scalding», как указано в этой ссылке:

https://github.com/twitter/scalding/wiki/Частозадаваемыевопросы#how-do-i-access-the-jobconf

Это работает, когда я запускаю ошпаривание локально. Но когда я запускаю его в кластере, conf имеет значение null, когда этот код выполняется в Reducer.

Есть ли лучший способ получить/сканировать HBase в задании Scalding/Cascading для таких случаев?


person Sathish    schedule 09.11.2014    source источник


Ответы (1)


Способы сделать это...

1) Вы можете использовать управляемый ресурс

class SomeJob(args: Args) extends Job(args) {      
  val someConfig = HBaseConfiguration.create().addResource(new Path(pathtoyourxmlfile))
  lazy val hPool = new HTablePool(someConfig, 3)

  def getConf = {
    implicitly[Mode] match {
      case Hdfs(_, conf) => conf
      case _ => whateveryou are doing for a local conf...
    }
  }
  ... somePipe.someOperation.... {
        val gets = key.map { key => new Get(key) }
        managed(hPool.getTable("myTableName")) acquireAndGet { table => 
          val results = table.get(gets)
          ...do something with these results
        }
     }    
}

2) Вы можете использовать более конкретный каскадный код, где вы пишете собственную схему, а внутри нее вы переопределяете исходный метод и, возможно, некоторые другие в зависимости от ваших потребностей. Там вы можете получить доступ к JobConf следующим образом:

class MyScheme extends Scheme[JobConf, SomeRecordReader, SomeOutputCollector, ..] {

  @transient var jobConf: Configuration = super.jobConfiguration

  override def source(flowProcess: FlowProcess[JobConf], ...): Boolean = {
   jobConf = flowProcess match {
     case h: HadoopFlowProcess => h.getJobConf
     case _ => jconf
   }

   ... dosomething with the jobConf here

 }   

}
person adev    schedule 17.12.2014