中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

Giraph源碼分析(八)——統(tǒng)計每個SuperStep中參與計算的頂點數(shù)目

作者|白松

站在用戶的角度思考問題,與客戶深入溝通,找到荔城網(wǎng)站設(shè)計與荔城網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:網(wǎng)站制作、網(wǎng)站設(shè)計、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、域名與空間、網(wǎng)站空間、企業(yè)郵箱。業(yè)務(wù)覆蓋荔城地區(qū)。

目的:科研中,需要分析在每次迭代過程中參與計算的頂點數(shù)目,來進一步優(yōu)化系統(tǒng)。比如,在SSSP的compute()方法最后一行,都會把當(dāng)前頂點voteToHalt,即變?yōu)镮nActive狀態(tài)。所以每次迭代完成后,所有頂點都是InActive狀態(tài)。在大同步后,收到消息的頂點會被激活,變?yōu)锳ctive狀態(tài),然后調(diào)用頂點的compute()方法。本文的目的就是統(tǒng)計每次迭代過程中,參與計算的頂點數(shù)目。下面附上SSSP的compute()方法:

@Override
  public void compute(Iterable messages) {
    if (getSuperstep() == 0) {
      setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource() ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (minDist < getValue().get()) {
      setValue(new DoubleWritable(minDist));
      for (Edge edge : getEdges()) {
        double distance = minDist + edge.getValue().get();
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
    //把頂點置為InActive狀態(tài)
    voteToHalt();
  }

附:giraph中算法的終止條件是:沒有活躍頂點且worker間沒有消息傳遞。

hama-0.6.0中算法的終止條件只是:判斷是否有活躍頂點。不是真正的pregel思想,半成品。

修改過程如下:

  1. org.apache.giraph.partition. PartitionStats 類

添加變量和方法,用來統(tǒng)計每個Partition在每個超步中參與計算的頂點數(shù)目。添加的變量和方法如下:

/** computed vertices in this partition */
private long computedVertexCount=0;

/**
* Increment the computed vertex count by one.
*/
public void incrComputedVertexCount() {
    ++ computedVertexCount;
}

/**
 * @return the computedVertexCount
 */
public long getComputedVertexCount() {
    return computedVertexCount;
}

修改readFields()和write()方法,每個方法追加最后一句。當(dāng)每個Partition計算完成后,會把自己的computedVertexCount發(fā)送給Master,Mater再讀取匯總。

@Override
public void readFields(DataInput input) throws IOException {
    partitionId = input.readInt();
    vertexCount = input.readLong();
    finishedVertexCount = input.readLong();
    edgeCount = input.readLong();
    messagesSentCount = input.readLong();
    //添加下條語句
    computedVertexCount=input.readLong();
}

@Override
public void write(DataOutput output) throws IOException {
    output.writeInt(partitionId);
    output.writeLong(vertexCount);
    output.writeLong(finishedVertexCount);
    output.writeLong(edgeCount);
    output.writeLong(messagesSentCount);
    //添加下條語句
    output.writeLong(computedVertexCount);
}
  1. org.apache.giraph.graph. GlobalStats 類

    添加變量和方法,用來統(tǒng)計每個超步中參與計算的頂點總數(shù)目,包含每個Worker上的所有Partitions。

 /** computed vertices in this partition 
  *  Add by BaiSong 
  */
  private long computedVertexCount=0;
     /**
     * @return the computedVertexCount
     */
    public long getComputedVertexCount() {
        return computedVertexCount;
    }

修改addPartitionStats(PartitionStats partitionStats)方法,增加統(tǒng)計computedVertexCount功能。

/**
  * Add the stats of a partition to the global stats.
  *
  * @param partitionStats Partition stats to be added.
  */
  public void addPartitionStats(PartitionStats partitionStats) {
    this.vertexCount += partitionStats.getVertexCount();
    this.finishedVertexCount += partitionStats.getFinishedVertexCount();
    this.edgeCount += partitionStats.getEdgeCount();
    //Add by BaiSong,添加下條語句
    this.computedVertexCount+=partitionStats.getComputedVertexCount();
 }

當(dāng)然為了Debug方便,也可以修改該類的toString()方法(可選),修改后的如下:

public String toString() {
        return "(vtx=" + vertexCount + ", computedVertexCount="
                + computedVertexCount + ",finVtx=" + finishedVertexCount
                + ",edges=" + edgeCount + ",msgCount=" + messageCount
                + ",haltComputation=" + haltComputation + ")";
    }
  1. org.apache.giraph.graph. ComputeCallable<I,V,E,M>

添加統(tǒng)計功能。在computePartition()方法中,添加下面一句。

if (!vertex.isHalted()) {
        context.progress();
        TimerContext computeOneTimerContext = computeOneTimer.time();
        try {
            vertex.compute(messages);
        //添加下面一句,當(dāng)頂點調(diào)用完compute()方法后,就把該Partition的computedVertexCount加1
            partitionStats.incrComputedVertexCount();
        } finally {
           computeOneTimerContext.stop();
        }
……
  1. 添加Counters統(tǒng)計,和我的博客Giraph源碼分析(七)—— 添加消息統(tǒng)計功能 類似,此處不再詳述。添加的類為:org.apache.giraph.counters.GiraphComputedVertex,下面附上該類的源碼:
package org.apache.giraph.counters;

import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;

/**
 * Hadoop Counters in group "Giraph Messages" for counting every superstep
 * message count.
 */

public class GiraphComputedVertex extends HadoopCountersBase {
    /** Counter group name for the giraph Messages */
    public static final String GROUP_NAME = "Giraph Computed Vertex";

    /** Singleton instance for everyone to use */
    private static GiraphComputedVertex INSTANCE;

    /** superstep time in msec */
    private final Map superstepVertexCount;

    private GiraphComputedVertex(Context context) {
        super(context, GROUP_NAME);
        superstepVertexCount = Maps.newHashMap();
    }

    /**
     * Instantiate with Hadoop Context.
     * 
     * @param context
     *            Hadoop Context to use.
     */
    public static void init(Context context) {
        INSTANCE = new GiraphComputedVertex(context);
    }

    /**
     * Get singleton instance.
     * 
     * @return singleton GiraphTimers instance.
     */
    public static GiraphComputedVertex getInstance() {
        return INSTANCE;
    }

    /**
     * Get counter for superstep messages
     * 
     * @param superstep
     * @return
     */
    public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {
        GiraphHadoopCounter counter = superstepVertexCount.get(superstep);
        if (counter == null) {
            String counterPrefix = "Superstep: " + superstep+" ";
            counter = getCounter(counterPrefix);
            superstepVertexCount.put(superstep, counter);
        }
        return counter;
    }

    @Override
    public Iterator iterator() {
        return superstepVertexCount.values().iterator();
    }
}
  1. 實驗結(jié)果,運行程序后。會在終端輸出每次迭代參與計算的頂點總數(shù)目。 測試SSSP(SimpleShortestPathsVertex類),輸入圖中共有9個頂點和12條邊。輸出結(jié)果如下:

上圖測試中,共有6次迭代。紅色框中,顯示出了每次迭代過沖參與計算的頂點數(shù)目,依次是:9,4,4,3,4,0

解釋:在第0個超步,每個頂點都是活躍的,所有共有9個頂點參與計算。在第5個超步,共有0個頂點參與計算,那么就不會向外發(fā)送消息,加上每個頂點都是不活躍的,所以算法迭代終止。

【閱讀更多文章請訪問數(shù)瀾社區(qū)】

網(wǎng)站標(biāo)題:Giraph源碼分析(八)——統(tǒng)計每個SuperStep中參與計算的頂點數(shù)目
網(wǎng)頁URL:http://m.rwnh.cn/article34/igpcpe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)做網(wǎng)站、網(wǎng)站排名、微信小程序App設(shè)計、網(wǎng)站設(shè)計

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
达州市| 靖边县| 太保市| 绩溪县| 公安县| 武穴市| 盖州市| 鄢陵县| 丹东市| 万盛区| 德化县| 庆阳市| 永春县| 井陉县| 遂溪县| 岳池县| 抚顺县| 长顺县| 柘城县| 金乡县| 新昌县| 宽城| 屯门区| 疏勒县| 三河市| 万宁市| 景泰县| 武城县| 千阳县| 邓州市| 揭东县| 保定市| 平南县| 林州市| 通道| 读书| 清镇市| 甘德县| 锡林郭勒盟| 临邑县| 石嘴山市|