Реализуйте функциональность A-B, используя каскадирование с LEFT JOIN

Есть два набора данных A и B (с одним столбцом - ID)

Cat A

    1
    2
    3
    4
    5
    6
    7

cat B
4
5
2
8
18
19
2197

Cat A-B
1
3
6
7

Это вычитание выполняется в 2 этапа (шаг 1: СОЕДИНЕНИЕ A ПО ИДЕНТИФИКАТОРУ, ЛЕВОЕ СОЕДИНЕНИЕ B ПО ИДЕНТИФИКАТОРУ), это даст набор данных с 2 столбцами, где в 1-м столбце будут все записи для набора данных A, а во 2-м столбце будут только совпадающие записи. из B

1   
2   2
3   
4   4
5   5
6   
7   

Шаг 2: Отфильтруйте набор данных с шага 1 по записям, где 2-е поле равно нулю. Таким образом, мы реализовали A-B, используя LEFT JOIN.

Я могу выполнить шаг 1, но не могу выполнить шаг 2. Ниже приведен исходный код шага 1.

public class AMinusB {

public static FlowDef createWorkflowLeftJoin(Tap aTap, Tap bTap,
        Tap outputTap) {
    Pipe bpipe = new Pipe("b_pipe");
    Pipe apipe = new Pipe("a_pipe");
    Fields b_user_id = new Fields("B_id");
    Fields a_user_id = new Fields("A_id");

    Pipe joinPipe = new HashJoin(apipe, a_user_id, bpipe, b_user_id,
            new LeftJoin());
    Pipe retainPipe = new Pipe("retain", joinPipe);
    retainPipe = new Retain(retainPipe, new Fields("A_id", "B_id"));

    Pipe cdistPipe = new Pipe("UniquePipe", retainPipe);

    Fields selector = new Fields("A_id", "B_id");

    cdistPipe = new Unique(cdistPipe, selector);

    FlowDef flowDef = FlowDef.flowDef().addSource(apipe, aTap)
            .addSource(bpipe, bTap).addTailSink(cdistPipe, outputTap)
            .setName("A-B using left outer join");
    return flowDef;
}

public static void main(String[] args) {
    String Apath = "path to data set A";
    String Bpath = "path to data set B";
    String outputPath = "path to output";
    Properties properties = new Properties();
    AppProps.setApplicationJarClass(properties,
            LocationsNumForAProduct.class);
    FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);

    Fields A = new Fields("A_id");
    Tap ATap = new Hfs(new TextDelimited(A, false, "\t"), Apath);

    Fields B = new Fields("B_id");
    Tap BTap = new Hfs(new TextDelimited(B, false, "\t"), Bpath);

    Tap outputTap = new Hfs(new TextDelimited(false, "\t"), outputPath);

    FlowDef flowDefLeftJoin = createWorkflowLeftJoin(ATap, BTap, outputTap);
    flowConnector.connect(flowDefLeftJoin).complete();

}

}


person Rakesh Shah    schedule 22.06.2016    source источник


Ответы (1)


Проверьте работу FilterNull.

cdistPipe = new Each(cdistPipe, selector,new FilterNull());
person Amit    schedule 23.06.2016