您好,登錄后才能下訂單哦!
作者|白松
1、?添加類,把每個超步發送的消息量大小寫入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages類,來統計消息量。
源代碼如下:
package org.apache.giraph.counters;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;
/**
* Hadoop Counters in group "Giraph Messages" for counting every superstep
* message count.
*/
public class GiraphMessages extends HadoopCountersBase {
/** Counter group name for the giraph Messages */
public static final String GROUP_NAME = "Giraph Messages";
/** Singleton instance for everyone to use */
private static GiraphMessages INSTANCE;
/** superstep time in msec */
private final Map superstepMessages;
private GiraphMessages(Context context) {
super(context, GROUP_NAME);
superstepMessages = Maps.newHashMap();
}
/**
* Instantiate with Hadoop Context.
*
* @param context
* Hadoop Context to use.
*/
public static void init(Context context) {
INSTANCE = new GiraphMessages(context);
}
/**
* Get singleton instance.
*
* @return singleton GiraphTimers instance.
*/
public static GiraphMessages getInstance() {
return INSTANCE;
}
/**
* Get counter for superstep messages
*
* @param superstep
* @return
*/
public GiraphHadoopCounter getSuperstepMessages(long superstep) {
GiraphHadoopCounter counter = superstepMessages.get(superstep);
if (counter == null) {
String counterPrefix = "Superstep- " + superstep+" ";
counter = getCounter(counterPrefix);
superstepMessages.put(superstep, counter);
}
return counter;
}
@Override
public Iterator iterator() {
return superstepMessages.values().iterator();
}
}
2、在BspServiceMaster類中添加統計功能。Master在每次同步時候,會聚集每個Worker發送的消息量大小(求和),存儲于GlobalStats中。因此只需要在每次同步后,從GlobalStats對象中取出總的通信量大小,然后寫入GiraphMessages中。格式為<SuperStep-Number,TotalMessagesCount>,實際存儲于上步GiraphMessages類中定義的Map<Long, GiraphHadoopCounter> superstepMessages 對象中。 在BspServiceMaster的構造方法中,最后面追加一行代碼,對GiraphMessages進行初始化。
GiraphMessages.init(context);
在BspServiceMaster類的SuperstepState coordinateSuperstep()方法中,添加記錄功能。片段代碼如下:
……
// If the master is halted or all the vertices voted to halt and there
// are no more messages in the system, stop the computation
GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
LOG.info("D-globalStats: "+globalStats+"\n\n");
//添加下面語句。從第0個超步起開始記錄。
if(getSuperstep() != INPUT_SUPERSTEP) {
GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount());
}
……
3、實驗結果如下:
完!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。