您好,登錄后才能下訂單哦!
作者|白松
目的:科研中,需要分析在每次迭代過程中參與計算的頂點數目,來進一步優化系統。比如,在SSSP的compute()方法最后一行,都會把當前頂點voteToHalt,即變為InActive狀態。所以每次迭代完成后,所有頂點都是InActive狀態。在大同步后,收到消息的頂點會被激活,變為Active狀態,然后調用頂點的compute()方法。本文的目的就是統計每次迭代過程中,參與計算的頂點數目。下面附上SSSP的compute()方法:
@Override
public void compute(Iterable messages) {
if (getSuperstep() == 0) {
setValue(new DoubleWritable(Double.MAX_VALUE));
}
double minDist = isSource() ? 0d : Double.MAX_VALUE;
for (DoubleWritable message : messages) {
minDist = Math.min(minDist, message.get());
}
if (minDist < getValue().get()) {
setValue(new DoubleWritable(minDist));
for (Edge edge : getEdges()) {
double distance = minDist + edge.getValue().get();
sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
}
}
//把頂點置為InActive狀態
voteToHalt();
}
附:giraph中算法的終止條件是:沒有活躍頂點且worker間沒有消息傳遞。
hama-0.6.0中算法的終止條件只是:判斷是否有活躍頂點。不是真正的pregel思想,半成品。
修改過程如下:
添加變量和方法,用來統計每個Partition在每個超步中參與計算的頂點數目。添加的變量和方法如下:
/** computed vertices in this partition */
private long computedVertexCount=0;
/**
* Increment the computed vertex count by one.
*/
public void incrComputedVertexCount() {
++ computedVertexCount;
}
/**
* @return the computedVertexCount
*/
public long getComputedVertexCount() {
return computedVertexCount;
}
修改readFields()和write()方法,每個方法追加最后一句。當每個Partition計算完成后,會把自己的computedVertexCount發送給Master,Mater再讀取匯總。
@Override
public void readFields(DataInput input) throws IOException {
partitionId = input.readInt();
vertexCount = input.readLong();
finishedVertexCount = input.readLong();
edgeCount = input.readLong();
messagesSentCount = input.readLong();
//添加下條語句
computedVertexCount=input.readLong();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(partitionId);
output.writeLong(vertexCount);
output.writeLong(finishedVertexCount);
output.writeLong(edgeCount);
output.writeLong(messagesSentCount);
//添加下條語句
output.writeLong(computedVertexCount);
}
org.apache.giraph.graph. GlobalStats 類
添加變量和方法,用來統計每個超步中參與計算的頂點總數目,包含每個Worker上的所有Partitions。
/** computed vertices in this partition
* Add by BaiSong
*/
private long computedVertexCount=0;
/**
* @return the computedVertexCount
*/
public long getComputedVertexCount() {
return computedVertexCount;
}
修改addPartitionStats(PartitionStats partitionStats)方法,增加統計computedVertexCount功能。
/**
* Add the stats of a partition to the global stats.
*
* @param partitionStats Partition stats to be added.
*/
public void addPartitionStats(PartitionStats partitionStats) {
this.vertexCount += partitionStats.getVertexCount();
this.finishedVertexCount += partitionStats.getFinishedVertexCount();
this.edgeCount += partitionStats.getEdgeCount();
//Add by BaiSong,添加下條語句
this.computedVertexCount+=partitionStats.getComputedVertexCount();
}
當然為了Debug方便,也可以修改該類的toString()方法(可選),修改后的如下:
public String toString() {
return "(vtx=" + vertexCount + ", computedVertexCount="
+ computedVertexCount + ",finVtx=" + finishedVertexCount
+ ",edges=" + edgeCount + ",msgCount=" + messageCount
+ ",haltComputation=" + haltComputation + ")";
}
添加統計功能。在computePartition()方法中,添加下面一句。
if (!vertex.isHalted()) {
context.progress();
TimerContext computeOneTimerContext = computeOneTimer.time();
try {
vertex.compute(messages);
//添加下面一句,當頂點調用完compute()方法后,就把該Partition的computedVertexCount加1
partitionStats.incrComputedVertexCount();
} finally {
computeOneTimerContext.stop();
}
……
package org.apache.giraph.counters;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;
/**
* Hadoop Counters in group "Giraph Messages" for counting every superstep
* message count.
*/
public class GiraphComputedVertex extends HadoopCountersBase {
/** Counter group name for the giraph Messages */
public static final String GROUP_NAME = "Giraph Computed Vertex";
/** Singleton instance for everyone to use */
private static GiraphComputedVertex INSTANCE;
/** superstep time in msec */
private final Map superstepVertexCount;
private GiraphComputedVertex(Context context) {
super(context, GROUP_NAME);
superstepVertexCount = Maps.newHashMap();
}
/**
* Instantiate with Hadoop Context.
*
* @param context
* Hadoop Context to use.
*/
public static void init(Context context) {
INSTANCE = new GiraphComputedVertex(context);
}
/**
* Get singleton instance.
*
* @return singleton GiraphTimers instance.
*/
public static GiraphComputedVertex getInstance() {
return INSTANCE;
}
/**
* Get counter for superstep messages
*
* @param superstep
* @return
*/
public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {
GiraphHadoopCounter counter = superstepVertexCount.get(superstep);
if (counter == null) {
String counterPrefix = "Superstep: " + superstep+" ";
counter = getCounter(counterPrefix);
superstepVertexCount.put(superstep, counter);
}
return counter;
}
@Override
public Iterator iterator() {
return superstepVertexCount.values().iterator();
}
}
上圖測試中,共有6次迭代。紅色框中,顯示出了每次迭代過沖參與計算的頂點數目,依次是:9,4,4,3,4,0
解釋:在第0個超步,每個頂點都是活躍的,所有共有9個頂點參與計算。在第5個超步,共有0個頂點參與計算,那么就不會向外發送消息,加上每個頂點都是不活躍的,所以算法迭代終止。
【閱讀更多文章請訪問數瀾社區】
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。