您好,登錄后才能下訂單哦!
1、原理解釋
(1)原始數據集合List按照一定的規則進行排序,初始距離閾值設置為T1、T2,T1>T2。
(2)在List中隨機挑選一個數據向量A,使用一個粗糙距離計算方式計算A與List中其它樣本數據向量之間的距離d。
(3)根據2中的距離d,把d小于T1的樣本數據向量劃到一個canopy中,同時把d小于T2的樣本數據向量從List中移除。
(4)重復2、3,直至List為空
2、下載測試數據
cd /tmp
hadoop dfs -mkdir /input
wget http://archive.ics.uci.edu/ml/databases/synthetic_control/synthetic_control.data
hadoop dfs -copyFromLocal /tmp/synthetic_control.data /input/synthetic_control.data
3、格式轉換(文本→向量)
編輯文件 Text2VectorWritable.jar
package mahout.fansy.utils.transform;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
/**
* --* transform text data to vectorWritable data
* --* @author fansy
* --*
* --*/
public class Text2VectorWritable extends AbstractJob{
public static void main(String[] args) throws Exception{
ToolRunner.run(new Configuration(), new Text2VectorWritable(),args);
}
@Override
public int run(String[] arg0) throws Exception {
addInputOption();
addOutputOption();
if (parseArguments(arg0) == null) {
return -1;
}
Path input=getInputPath();
Path output=getOutputPath();
Configuration conf=getConf();
// set job information
Job job=new Job(conf,"text2vectorWritableCopy with input:"+input.getName());
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(Text2VectorWritableMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(VectorWritable.class);
job.setReducerClass(Text2VectorWritableReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(VectorWritable.class);
job.setJarByClass(Text2VectorWritable.class);
FileInputFormat.addInputPath(job, input);
SequenceFileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true)) { // wait for the job is done
throw new InterruptedException("Canopy Job failed processing " + input);
}
return 0;
}
/**
* Mapper main procedure
* @author fansy
*
--*/
public static class Text2VectorWritableMapper extends Mapper<LongWritable,Text,LongWritable,VectorWritable>{
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String[] str=value.toString().split("\\s{1,}");
// split data use one or more blanker
Vector vector=new RandomAccessSparseVector(str.length);
for(int i=0;i<str.length;i++){
vector.set(i, Double.parseDouble(str[i]));
}
VectorWritable va=new VectorWritable(vector);
context.write(key, va);
}
}
/**
* Reducer: do nothing but output
* @author fansy
*
--*/
public static class Text2VectorWritableReducer extends Reducer<LongWritable,VectorWritable,LongWritable,VectorWritable>{
public void reduce(LongWritable key,Iterable<VectorWritable> values,Context context)throws IOException,InterruptedException{
for(VectorWritable v:values){
context.write(key, v);
}
}
}
}
編譯,輸出ClusteringUtils.jar,并拷貝至/home/mahout/mahout_jar
輸出時選擇Export→Runnable Jar File→Extract required libraries into generated JAR
然后執行:
hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.transform.Text2VectorWritable -i hdfs:///input/synthetic_control.data -o hdfs:///input/synthetic_control.data.transform
有可能會遇到org/apache/mahout/common/AbstractJob找不到類報錯,這個一般是由于HADOOP_CLASSPATH配置位置不包含mahout的jar的原因。
解決方法1:
拷貝mahout的jar文件到/home/hadoop/lib中去,并確認這個/home/hadoop/lib確實在HADOOP_CLASSPATH中
cp /home/hadoop/mahout/*.jar /home/hadoop/hadoop/lib
解決方法2(推薦):
在hadoop-env.sh中加入
for f in /home/hadoop/mahout/*.jar; do
if [ "$HADOOP_CLASSPATH" ]; then
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f
else
export HADOOP_CLASSPATH=$f
fi
done
記得將hadoop-evn.sh分發到其它節點
重啟hadoop環境
stop-all.sh
start-all.sh
執行轉換:
hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.transform.Text2VectorWritable -i hdfs:///input/synthetic_control.data -o hdfs:///input/synthetic_control.data.transform(如果在導出Jar的時候已經指派主類,這個命令會報錯,使用下面的命令)
hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar -o hdfs:///input/synthetic_control.data.transform
輸出完畢的文件已經是面目全非的Vector文件了
hdfs:///input/synthetic_control.data.transform/part-r-00000
4、執行Canopy聚類
mahout canopy --input hdfs:///input/synthetic_control.data.transform/part-r-00000 --output /output/canopy --distanceMeasure org.apache.mahout.common.distance.EuclideanDistanceMeasure --t1 80 --t2 55 --t3 80 --t4 55 --clustering
5、轉換格式(向量→文本)
把4中的結果轉換成為文本
編輯文件ReadClusterWritable.java
package mahout.fansy.utils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.common.AbstractJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
??* read cluster centers
??* @author fansy
??*/
public class ReadClusterWritable extends AbstractJob {
public static void main(String[] args) throws Exception{
ToolRunner.run(new Configuration(), new ReadClusterWritable(),args);
}
@Override
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
if (parseArguments(args) == null) {
return -1;
}
Job job=new Job(getConf(),getInputPath().toString());
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(RM.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setJarByClass(ReadClusterWritable.class);
FileInputFormat.addInputPath(job, getInputPath());
FileOutputFormat.setOutputPath(job, getOutputPath());
if (!job.waitForCompletion(true)) {
throw new InterruptedException("Canopy Job failed processing " + getInputPath());
}
return 0;
}
public static class RM extends Mapper<Text,ClusterWritable ,Text,Text>{
private Logger log=LoggerFactory.getLogger(RM.class);
public void map(Text key,ClusterWritable value,Context context) throws
IOException,InterruptedException{
String str=value.getValue().getCenter().asFormatString();
// System.out.println("center****************:"+str);
log.info("center*****************************:"+str); // set log information
context.write(key, new Text(str));
}
}
}
打包到ClusteringUtils.jar,上傳到/home/hadoop/mahout/mahout_jar
如果需要清除eclipse中Launch Configuration中的信息,需要進入工程所在文件夾下的/.metadata/.plugins/org.eclipse.debug.core/.launches
然后刪除里面的文件
運行
hadoop jar ClusteringUtils.jar mahout.fansy.utils.ReadClusterWritable -i /output/canopy/clusters-0-final/part-r-00000 -o /output/canopy-output(如果不成功就運行下面的命令)
hadoop jar ClusteringUtils.jar -i /output/canopy/clusters-0-final/part-r-00000 -o /output/canopy-output
這時候/output/canopy-output/part-m-00000里面放置的就是聚類的結果文件
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。