博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Mahout源码MeanShiftCanopyDriver分析之二MeanShiftCanopyMapper仿造
阅读量:4538 次
发布时间:2019-06-08

本文共 5699 字,大约阅读时间需要 18 分钟。

首先更正一点,昨天处理数据的时候是有问题的,直接从网页中拷贝的文件的空格是有问题的,直接拷贝然后新建的文件中的空格可能有一个两个、三个的,所以要把两个或者三个的都换为一个,在InputMapper中下面的代码:

 

private static final Pattern SPACE = Pattern.compile(" ");String[] numbers = SPACE.split(values.toString());

可以看到这个代码是以一个空格来区分的,可以在linux的terminal中输入下面的命令来进行替换:

 

 

Sed -I "s/   / /g" `grep    -l synthetic_control.data`   -- 替换三个空格为一个空格Sed -I "s/  / /g" `grep    -l synthetic_control.data`    -- 替换两个空格为一个空格

通过上面的命令,然后在上传,使用昨天的命令进行meanshiftCanopyDriver的调用。

不过补充一点,因为在InputMapper中对这个数据的处理还有下面的代码:

 

 

for (String value : numbers) {      if (!value.isEmpty()) {        doubles.add(Double.valueOf(value));      }    }

这个代码就表示如果是空字符串的话,就不进行添加,所以说输入数据和前面保持一致也是可以的,即昨天的数据和今天修改的数据其结果一样。

MeansShiftCanopyDriver的run方法跳转如下:

 

run(159行)-->buildClusters(282行)-->buildClustersMR(353行)-->runIterationMR(412行),这里说明几点:

在159行开始run方法进入后,进行第一个判断inputIsCanopies,如下:

 

if (inputIsCanopies) {      clustersIn = input;    } else {      createCanopyFromVectors(conf, input, clustersIn, measure, runSequential);    }

因为在前面的测试中我们已经使用了InputDriver把输入数据转换为了canopy,所以这里直接进入了clustersIn=input,然后往下面走;

 

在282行的buildClusters方法进入后因为是默认在Hadoop中跑的程序,所以是使用MR算法的,进入到else中,如下:

 

if (runSequential) {      return buildClustersSeq(clustersIn, output, measure, kernelProfile, t1,          t2, convergenceDelta, maxIterations, runClustering);    } else {      return buildClustersMR(conf, clustersIn, output, measure, kernelProfile,          t1, t2, convergenceDelta, maxIterations, runClustering);    }

在353行中的方法buildClustersMR进入后,即开始进行循环,混合的主体是412行的runIterationMR方法。本篇主要分析此Job的Mapper和Reducer类,

 

这两个类分别是MeanShiftCanopyMapper、MeanShiftCanopyReducer。下面的代码是MeanShiftCanopyMapper的仿造代码,可以直接使用此代码进行调试,这样就可以看到MeanShiftCanopyMapper的数据逻辑流了,今晚又太晚了,明天还要早起。就下次再分析了,代码如下:

 

package mahout.fansy.meanshift;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.mahout.clustering.iterator.ClusterWritable;import org.apache.mahout.clustering.meanshift.MeanShiftCanopy;import org.apache.mahout.clustering.meanshift.MeanShiftCanopyClusterer;import org.apache.mahout.clustering.meanshift.MeanShiftCanopyConfigKeys;import org.apache.mahout.common.iterator.sequencefile.PathFilters;import org.apache.mahout.common.iterator.sequencefile.PathType;import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;import com.google.common.collect.Lists;public class MeanShiftCanopyMapperFollow {	/**	 * MeanShiftCanopyMapper仿造代码	 * @author fansy	 * @param args	 */	public static void main(String[] args) {		cleanup();// 调试cleanup函数	}		/**	 * 仿造map操作	 */	public static Collection
map(){ Collection
canopies = Lists.newArrayList(); List
data=getMapData(); // 获取map的输入值 MeanShiftCanopyClusterer clusterer=setup(); // 获取setup函数中经过设置的值 for (ClusterWritable clusterWritable : data){ // 这里设置断点,查看程序初始数据 MeanShiftCanopy canopy = (MeanShiftCanopy)clusterWritable.getValue(); clusterer.mergeCanopy(canopy.shallowCopy(), canopies); } return canopies; } /** * 仿造setup函数 * @return 返回经过设置值的MeanShiftCanopyClusterer */ public static MeanShiftCanopyClusterer setup(){ String measureClassName="org.apache.mahout.common.distance.EuclideanDistanceMeasure"; String kernelProfileClassName="org.apache.mahout.common.kernel.TriangularKernelProfile"; double convergenceDelta=0.5; double t1=47.6; double t2=1; boolean runClustering=true; Configuration conf =new Configuration(); conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName); conf.set(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY, kernelProfileClassName); conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, String .valueOf(convergenceDelta)); conf.set(MeanShiftCanopyConfigKeys.T1_KEY, String.valueOf(t1)); conf.set(MeanShiftCanopyConfigKeys.T2_KEY, String.valueOf(t2)); conf.set(MeanShiftCanopyConfigKeys.CLUSTER_POINTS_KEY, String .valueOf(runClustering)); MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(conf); return clusterer; } /** * 仿造cleanup函数 */ public static Map
cleanup(){ int numReducers=1; // 自己设定,这里为了方便直接设置为1 Map
map=new HashMap
(); Collection
canopies=map(); // 获得map的输出 MeanShiftCanopyClusterer clusterer =setup();// 获得setup的输出 int reducer = 0; for (MeanShiftCanopy canopy : canopies) { clusterer.shiftToMean(canopy); ClusterWritable clusterWritable = new ClusterWritable(); clusterWritable.setValue(canopy); map.put(new Text(String.valueOf(reducer)), clusterWritable); reducer++; if (reducer >= numReducers) { reducer=0; } } return map; } /** * 获得map的输入数据,输入数据的value是ClusterWritable类型的 * @return */ public static List
getMapData(){ Path input=new Path("hdfs://ubuntu:9000/user/test/input/real_input/part-m-00000"); //路径是经过InputDriver后的输出路径 Configuration conf=new Configuration(); conf.set("mapred.job.tracker", "ubuntu:9001"); List
clusters = new ArrayList
(); for (Writable value : new SequenceFileDirValueIterable
(input, PathType.LIST, PathFilters.partFilter(), conf)) { Class
valueClass = value.getClass(); if (valueClass.equals(ClusterWritable.class)) { ClusterWritable clusterWritable = (ClusterWritable) value; clusters.add( clusterWritable); } else { throw new IllegalStateException("can't read " + input); } } return clusters; }}

今天培训还听讲师说不要抱怨,额,好吧,现在感觉天天都是1点半之后或者左右的时间睡觉了,严重感觉睡眠不足,哎,难道这就是程序员的名?今天讲师还说确定目标后有四个阶段:初始兴奋期、寂寞期、煎熬期、成功期,我现在还在哪个阶段熬着呀。额,好吧,慢慢来,坚持。。。

 

 

分享,快乐,成长

转载请注明出处: 

转载于:https://www.cnblogs.com/pangblog/p/3278078.html

你可能感兴趣的文章
把数字数值转换成单词形式
查看>>
Swift游戏实战-跑酷熊猫 14 熊猫打滚
查看>>
pdfjs预览pdf文件的两种方式(可复制)
查看>>
hdu1042N!
查看>>
Coder-Strike 2014 - Round 1(A~E)
查看>>
【BZOJ2739】—最远点(决策单调性+分治)
查看>>
shell 流程控制
查看>>
MVC学习-发送请求
查看>>
微信开发-ACCESS TOKEN 过期失效解决方案
查看>>
(转)正则表达式验证大全
查看>>
被汉得拒绝以后
查看>>
转:Python自省(反射)指南
查看>>
SVN
查看>>
谷歌浏览器
查看>>
Python 流程控制:for
查看>>
android.os.NetworkOnMainThreadException异常如何解决
查看>>
我的轮播练习
查看>>
js中index()的四种经典用法111
查看>>
vb Array.ConvertAll 泛型方法
查看>>
flask 基本配置和参数解释
查看>>