HIVE: как суммировать значения из строк "ключ-значение" по ключу

У меня есть два string столбца, содержащие key/value парные переменные. Что-то вроде этого:

    column1         column2
a:1,b:2,c:3         a:5,c:3
   a:12,b:4     a:9,b:3,d:5

Как я могу суммировать эти значения (в реальной жизни я не знаю, сколько у меня ключей, некоторые ключи можно найти только в одном столбце) по определенному ключу, чтобы получить это:

         column12
  a:6,b:2,c:6,d:0
 a:21,b:7,c:0,d:5

или это:

 a  b  c  d
 6  2  6  0
21  7  0  5

Спасибо за помощь!


person Maju116    schedule 23.08.2016    source источник


Ответы (3)


Предполагая, что каждая строка имеет уникальный идентификатор "id", этот запрос ниже
должен работать.

select id, collect_list(CONCAT(key,':',val)) as column12  
from  
(  
  select id, key, SUM(val) as val  
  from  
  (  
    select id, k1 as key, k2 as key, v1 as val, v2 as val  
    from  
    (  
      select id, str_to_map(col1,',',':') as c1, str_to_map(col2,',',':') as c2  
      from table  
    )x  
    LATERAL VIEW explode(c1) e1 as k1,v1  
    LATERAL VIEW explode(c2) e2 as k2,v2  
  )y  
)z  
person pallavimane    schedule 25.04.2017

Один из подходов - написать собственный код mapreduce с помощью Hcatalog.

Следующий MR принимает входные столбцы таблицы 1 и записывает результирующий столбец в table2 (редуктор не требуется, поскольку логика обрабатывается в картографе)

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hcatalog.common.*;
import org.apache.hcatalog.mapreduce.*;
import org.apache.hcatalog.data.*;
import org.apache.hcatalog.data.schema.*;
import org.apache.commons.lang3.ArrayUtils;

public class HCatSum extends Configured implements Tool {

    public static class Map
            extends
            Mapper<WritableComparable, HCatRecord, WritableComparable, HCatRecord> {
        String column1;
        String column2;
        String val;

        @Override
        protected void map(
                WritableComparable key,
                HCatRecord value,
                org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord, WritableComparable, HCatRecord>.Context context)
                throws IOException, InterruptedException {

            column1 = (String) value.get(0); // a:1,b:2,c:3
            column2 = (String) value.get(1); // a:5,c:3

            String colArray[] = (String[]) ArrayUtils.addAll(
                    column1.split(","), column2.split(","));
            HashMap<String, Integer> hs = new HashMap<String, Integer>();
            for (String token : colArray) {
                String tokensplit[] = token.split(":");
                String k = tokensplit[0];
                int v = Integer.parseInt(tokensplit[1]);
                if (hs.containsKey(k)) {
                    int prev = hs.get(k);
                    hs.put(k, prev + v);
                } else {
                    hs.put(k, v);                       
                }
            }

            val = Arrays.toString(hs.entrySet().toArray()); // [a=6,b=2,c=6]
            HCatRecord record = new DefaultHCatRecord(1);
            record.set(0, val);
            context.write(null, record);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();

        // Get the input and output table names as arguments
        String inputTableName = args[0];
        String outputTableName = args[1];
        // Assume the default database
        String dbName = null;

        Job job = new Job(conf, "HCatsum");
        HCatInputFormat.setInput(job,
                InputJobInfo.create(dbName, inputTableName, null));
        job.setJarByClass(HCatSum.class);
        job.setMapperClass(Map.class);

        // An HCatalog record as input
        job.setInputFormatClass(HCatInputFormat.class);

        // Mapper emits a string as key and an integer as value
        job.setMapOutputKeyClass(WritableComparable.class);
        job.setMapOutputValueClass(DefaultHCatRecord.class);

        // Ignore the key for the reducer output; emitting an HCatalog record as
        // value

        job.setOutputKeyClass(WritableComparable.class);
        job.setOutputValueClass(DefaultHCatRecord.class);
        job.setOutputFormatClass(HCatOutputFormat.class);

        HCatOutputFormat.setOutput(job,
                OutputJobInfo.create(dbName, outputTableName, null));
        HCatSchema s = HCatOutputFormat.getTableSchema(job);
        System.err.println("INFO: output schema explicitly set for writing:"
                + s);
        HCatOutputFormat.setSchema(job, s);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new HCatSum(), args);
        System.exit(exitCode);
    }
}

Как запустить MR с HCatalog (справочная ссылка): Руководство по Hive

Надеюсь, это было полезно.

person Aditya    schedule 23.08.2016

Вот решение. Это немного похоже на взлом, но он будет работать независимо от количества ключей, которые у вас есть.

udf0.py

#/usr/bin/python

import sys
from collections import Counter

for line in sys.stdin:
    words = line.strip().split('\t')
    c = Counter()

    for word in words:
        d = {}
        s = word.split(',')
        for ss in s:
            k,v = ss.split(':')
            d[k] = int(v)

        c.update(d)

    print ','.join([str(k)+':'+str(v) for k,v in dict(c).iteritems()])

udf1.py

#!/usr/bin/python

import sys

for line in sys.stdin:
    w0, w1 = line.strip().split('\t')

    out = {}
    d = {}
    l = []

    s0 = w0.strip().split(',')
    s1 = w1.strip().split(',')

    for ss in s0:
        k,v = ss.split(':')
        d[k] = int(v)

    for ss in s1:
        l.append(ss)

    for keys in l:
        if d.get(keys, None) is not None:
            out[keys] = d[keys]
        else:
            out[keys] = 0

     print ','.join([str(k)+':'+str(v) for k,v in out.iteritems()])

Запрос улья:

add file /home/username/udf0.py;
add file /home/username/udf1.py;

SELECT TRANSFORM(dict, unique_keys)
       USING 'python udf1.py'
       AS (final_map STRING)
FROM (
  SELECT DISTINCT dict
    , CONCAT_WS(',', unique_keys) as unique_keys
  FROM (
    SELECT dict
      , COLLECT_SET(keys) OVER () AS unique_keys
    FROM (
      SELECT dict
        , keys
      FROM (
        SELECT dict
          , map_keys(str_to_map(dict)) AS key_arr
        FROM (
          SELECT TRANSFORM (col1, col2)
                 USING 'python udf0.py'
                 AS (dict STRING)
          FROM db.tbl ) x ) z
      LATERAL VIEW EXPLODE(key_arr) exptbl AS keys ) a ) b ) c

Вывод:

a:6,b:2,c:6,d:0
a:21,b:7,c:0,d:5

Объяснение:

Первый UDF возьмет вашу строку, преобразует ее в словарь Python и обновит ключи (т.е. сложит значения с совпадающими ключами вместе). Поскольку вы не знаете фактических ключей, вы будете знать, что необходимо извлечь ключи из каждого словаря (map_keys() в запросе улья), взорвать таблицу, а затем собрать их обратно в уникальный набор. Теперь у вас есть все возможные ключи в любом словаре. Затем оттуда вы можете использовать вторую UDF для импорта словаря, созданного в первой UDF, проверить, присутствует ли каждый ключ, а если нет, поставить ноль для его значения.

person gobrewers14    schedule 25.08.2016