У меня нет большого опыта работы с кассандрой, поэтому, пожалуйста, извините меня, если я применил неправильный подход.
Я пытаюсь выполнить массовую загрузку в кассандре с уменьшением карты
В основном пример подсчета слов
Ссылка: http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/
Я поместил простой пример Hadoop Wordcount Mapper и немного изменил код драйвера и редуктор в соответствии с приведенным выше примером.
Я также успешно создал выходной файл. Теперь я сомневаюсь, как выполнить загрузку в кассандровую часть? Есть ли разница в моем подходе?
Пожалуйста посоветуй.
Это часть кода драйвера
Job job = new Job();
job.setJobName(getClass().getName());
job.setJarByClass(CassaWordCountJob.class);
Configuration conf = job.getConfiguration();
conf.set("cassandra.output.keyspace", "test");
conf.set("cassandra.output.columnfamily", "words");
conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
conf.set("cassandra.output.thrift.port","9160"); // default
conf.set("cassandra.output.thrift.address", "localhost");
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");
job.setMapperClass(CassaWordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(CassaWordCountReducer.class);
FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra"));
MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
return job.waitForCompletion(true) ? 0 : 1;
Mapper - это то же самое, что и обычный преобразователь wordcount, который просто токенизирует и выдает Word, 1
Класс редуктора имеет вид
public class CassaWordCountReducer extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Mutation> columnsToAdd = new ArrayList<Mutation>();
Integer wordCount = 0;
for(IntWritable value : values) {
wordCount += value.get();
}
Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
countCol.setTimestamp(new Date().getTime());
ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
wordCosc.setColumn(countCol);
Mutation countMut = new Mutation();
countMut.column_or_supercolumn = wordCosc;
columnsToAdd.add(countMut);
context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
}
}