public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// 对splits【0】抽样
for (int i = 0; i < 1; i++) {
System.out.println("PartialSampler will getSample splits["+i+"]");
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
// 选择value中的第一列抽样
Text value0 = new Text(value.toString().split("t")[0]);
samples.add((K) value0);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
Text value0 = new Text(value.toString().split("t")[0]);
samples.set(ind, (K) value0);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
//Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); Class<K> keyClass = (Class<K>)Text.class;
public int getPartition(K key, V value, int numPartitions) {
Text value0 = new Text(value.toString().split("t")[0]);
return partitions.findPartition((K) value0);
}
Class<? extends InputFormat> inputFormatClass = TextInputFormat.class;
Class<? extends OutputFormat> outputFormatClass = TextOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = Text.class;
Class<? extends Writable> outputValueClass = Text.class;
jobConf.setMapOutputKeyClass(LongWritable.class);
// Set user-supplied (possibly default) job configs
jobConf.setNumReduceTasks(num_reduces);
jobConf.setInputFormat(inputFormatClass);
jobConf.setOutputFormat(outputFormatClass);
jobConf.setOutputKeyClass(outputKeyClass);
jobConf.setOutputValueClass(outputValueClass);
if (sampler != null) {
System.out.println("Sampling input to effect total-order sort...");
jobConf.setPartitionerClass(TotalOrderPartitioner.class);
Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
//Path partitionFile = new Path(inputDir, "_sortPartitioning");
TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
InputSampler.<K,V>writePartitionFile(jobConf, sampler);
URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, jobConf);
DistributedCache.createSymlink(jobConf);
}
FileSystem hdfs = FileSystem.get(jobConf);
hdfs.delete(outputpath);
hdfs.close();
System.out.println("Running on " +
cluster.getTaskTrackers() +
" nodes to sort from " +
FileInputFormat.getInputPaths(jobConf)[0] + " into " +
FileOutputFormat.getOutputPath(jobConf) +
" with " + num_reduces + " reduces.");
Date startTime = new Date();
System.out.println("Job started: " + startTime);
jobResult = JobClient.runJob(jobConf);
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有