Выполнение массовой загрузки в cassandra с помощью map reduce

У меня нет большого опыта работы с кассандрой, поэтому, пожалуйста, извините меня, если я применил неправильный подход.

Я пытаюсь выполнить массовую загрузку в кассандре с уменьшением карты

В основном пример подсчета слов

Ссылка: http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/

Я поместил простой пример Hadoop Wordcount Mapper и немного изменил код драйвера и редуктор в соответствии с приведенным выше примером.

Я также успешно создал выходной файл. Теперь я сомневаюсь, как выполнить загрузку в кассандровую часть? Есть ли разница в моем подходе?

Пожалуйста посоветуй.

Это часть кода драйвера

 Job job = new Job();
 job.setJobName(getClass().getName());
 job.setJarByClass(CassaWordCountJob.class);

 Configuration conf = job.getConfiguration();
 conf.set("cassandra.output.keyspace", "test");
 conf.set("cassandra.output.columnfamily", "words");
 conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
 conf.set("cassandra.output.thrift.port","9160");    // default
 conf.set("cassandra.output.thrift.address", "localhost");
 conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

 job.setMapperClass(CassaWordCountMapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 job.setReducerClass(CassaWordCountReducer.class);
 FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra")); 
 MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
 return job.waitForCompletion(true) ? 0 : 1;

Mapper - это то же самое, что и обычный преобразователь wordcount, который просто токенизирует и выдает Word, 1

Класс редуктора имеет вид

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());
        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);
        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;
        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}

person Arun A K    schedule 05.02.2013    source источник
comment
что не работает в приведенном выше коде, какую ошибку вы получаете?   -  person Charles Menguy    schedule 05.02.2013
comment
Привет, Чарльз, я не уверен в операции записи в кассандру. Поскольку я указал место вывода, я получаю вывод файла. У меня вопрос - нужно ли нам выполнять какие-то другие операции для загрузки этого файла в кассандру? Как и для HBase, мы делаем - LoadIncrementalHFiles loadHfile = new LoadIncrementalHFiles (конфигурация); loadHfile.doBulkLoad (новый путь (resourcePath), hTable);   -  person Arun A K    schedule 05.02.2013
comment
Нет, вам не нужно загружать файл в кассандру. Вот для чего нужен OutputFormat. Вы уверены, что правильно используете формат множественного вывода?   -  person jonbros    schedule 05.02.2013
comment
Я внес изменения в редуктор в соответствии с предыдущим обсуждением - multipleOutputs.write (ByteBuffer.wrap (key.toString (). GetBytes ()), columnsToAdd, reducer); так будет выглядеть новый код. Даже сейчас данные не вставляются в Cassandra и не показывают никаких ошибок или исключений.   -  person Arun A K    schedule 06.02.2013
comment
Хотя я не выполнял никаких контекстных записей, я получаю два выходных файла в моем выходном пути reducer-r-00000 words-r-00000 Второй файл - это пустой текстовый файл, а первый - текстовый файл с содержимым, как показано ниже java.nio .HeapByteBuffer [pos = 0 lim = 1 cap = 1] [Mutation (column_or_supercolumn: ColumnOrSuperColumn (column: Column (name: 63 6F 75 6E 74, value: 31, timestamp: 1360125545574))] java.nio.HeapByteBuffer [pos = 0 lim = 4 cap = 4] [Mutation (column_or_supercolumn: ColumnOrSuperColumn (column: Column (name: 63 6F 75 6E 74, value: 31, timestamp: 1360125545586)))] и т. Д.   -  person Arun A K    schedule 06.02.2013


Ответы (1)


Чтобы выполнить массовую загрузку в Cassandra, я бы посоветовал прочитать эту статью от DataStax. В основном вам нужно сделать 2 вещи для массовой загрузки:

  • Ваши выходные данные изначально не помещаются в Cassandra, вам необходимо преобразовать их в SSTables.
  • Когда у вас есть SSTables, вы должны иметь возможность передавать их в Cassandra. Конечно, вы не хотите просто копировать каждый SSTable на каждый узел, вы хотите скопировать только соответствующую часть данных на каждый узел.

В вашем случае, когда вы используете BulkOutputFormat, он должен делать все это, поскольку он использует sstableloader за кулисами. Я никогда не использовал его с MultipleOutputs, но он должен работать нормально.

Я думаю, что ошибка в вашем случае заключается в том, что вы неправильно используете MultipleOutputs: вы все еще делаете context.write, когда вам действительно нужно писать в свой MultipleOutputs объект. Как вы это делаете сейчас, поскольку вы пишете в обычный Context, он будет подхвачен форматом вывода по умолчанию TextOutputFormat, а не тем, который вы определили в своем MultipleOutputs. Дополнительная информация о том, как использовать MultipleOutputs в вашем редукторе здесь.

После того, как вы напишете в правильный выходной формат BulkOutputFormat, как вы определили, ваши SSTables должны быть созданы и переданы в Cassandra с каждого узла в вашем кластере - вам не потребуется никаких дополнительных шагов, выходной формат позаботится об этом за вас.

Также я бы посоветовал посмотреть этот пост, где они также объясните, как использовать BulkOutputFormat, но они используют ConfigHelper, на который вы, возможно, захотите взглянуть, чтобы упростить настройку конечной точки Cassandra.

person Charles Menguy    schedule 05.02.2013