项目作者: guillaume6pl

项目描述 :
Computing pagerank with Hadoop MapReduce
高级语言: Python
项目地址: git://github.com/guillaume6pl/mr_pagerank.git
创建时间: 2017-04-24T07:33:58Z
项目社区:https://github.com/guillaume6pl/mr_pagerank

开源协议:

下载


Page Rank

Model

White Paper from Brin & Page
We describe the web according to the model defined by Brin & Page.

  • Web = oriented graph with n nodes (pages) and branches (links)
  • Web surfer = goes from nodes to nodes through links or teleportation (dumping factor)

When the web surfer is on a node “i”

  1. * [proba (1-d)] goes randomly on linked nodes (j_1,... j_k) through n_i outcoming links
  2. * [proba d] goes randomly among the n nodes

Algorithm

Variables

  • n : nombre de pages du web
  • d : probabilité de téléportation (dumping factor) = 0.15
  • n_i : nombre de liens sortants de la page i
  • tol : tolerance for convergence
  • (future version) memory_size : minimum node’s memory size in the cluster

Description

  • For each web surfer’s move we define a step “s”
  • Ps: n-column vector defining probabilities to go on a node “i” at next step
  • P function is such as P = (P0,P1,…)
  • At some point P converged (||P(s) - P(s-1)|| ~ 0). Then we will define Ps as pagerank for each node
  • At step 0: P0 = [1/n,…,1/n]
  • At step “s”, Ps such as:
    1. P(s) = (1-d) x T x P(s-1) + d/n x U
    2. for a given page "i"
    3. Pi(s) = (1-d) x Ti x P(s-1) + d/n x U
    • U: n-column vector - d*U = dumping probability from node i to node j
    • T: transition matrix such as T[i,j] = I[i,j]/n_j
    • G: n*n matrix of “outcoming links” - G[i,j] = 1 if i links to j 0 if not
    • I = G(-1): matrix of “incoming links” - I[i,j] = 1 if j links to i 0 if not
    • n_i: “outcoming links” number for a given node i

Process

  1. (not done yet) Launch mapper0 to distribute values per range such as they fit in memory of each node in the cluster
    • with test dataset, n < 10 000 so it’s not required
  2. Launch mapper/reducer through hadoop streaming for each step until convergence

Architecture

MAPPER
INPUT DATA: file “inv_adj_list” on stdin with each line such as

  1. * <node "i">: <j1, j2... -1>
  2. * <key>: node "i"
  3. * <value>: "incoming links" j1,...
  1. getting variables
  2. building P(s-1) vector from last line in “ps.txt”
  3. building outcoming_links_number from adj_list
    outcoming_links_number: # of outcoming links “nj” for each node j
  4. building Tmat (T matrix) from input data
  5. computing (Tmat*P_previous_step)
  6. printing output data on stdout
    OUTPUT DATA: on stdout with each line such as
    • \t
    • : node “i” | : Ti,j x Pj(s-1)

REDUCER
INPUT DATA: Mapper OUTPUT DATA

  1. setting variables
  2. building P(s-1) vector from last line in “ps.txt”
  3. getting input value on stdin as \t
  4. computing “TMat_Pvect_vector” by summing “TMat_Pvect_product” for each node “i”
  5. computing P(s) as P(s) = (1-d) x T x P(s-1) + d/n x U
  6. sending output data
    OUTPUT DATA:
    • if P didn’t converge: Ps added in “/output/pagerank/ps.txt” as a new line
    • if P converged:
      • ordered pagerank list in “/output/pagerank/pagerank.txt”
      • top 20 highest pageranks on stdout

Dataset

  • Test from dataset “movies” (pages du web retrouvées par la requête movies)
  • Dataset Movies
  • 3 files to have:

    1. “nodes” :
      • page list returned by the google query “movies”
      • 7966 pages with their ID, link and description
    2. “adj_list”
      • represents G matrix
      • each line is a list of outcoming links for a given node
      • each line as: pid: pid1 pid2 … pidN -1
    3. “inv_adj_list”
      • represents I matrix
      • each line is a list of incoming links for a given node
      • each line as: pid: pid1 pid2 … pidN -1
  • To compute pagerank on a different webspace, simply get data in the same format, that is those 3 files.

Instructions

Manually

  1. Download dataset of your choice and put the 3 required files on hdfs @ /input/pagerank/
  2. Start hadoop
    1. $ start-dfs.sh
  3. Set variables
    1. $ cd <path_to_scripts>
    2. $ HADOOP_HOME = <path/to/hadoop>
    3. $ HDFS_INPUT_DIR = <path/to/hdfs/input/dir>
    4. $ HDFS_OUTPUT_DIR = <path/to/hdfs/output/dir>
    5. $ L_INPUT_DIR = <path/to/local/input/dir>
    6. $ L_OUTPUT_DIR = <path/to/local/output/dir>
  4. Execute mapper/reducer until convergence (each execution is one step)
    1. hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
    2. -input $HDFS_INPUT_DIR/inv_adj_list \
    3. -output $HDFS_OUTPUT_DIR \
    4. -mapper "$SCRIPT_PATH/pagerank_mapper.py $L_INPUT_DIR $L_OUTPUT_DIR"\
    5. -reducer "$SCRIPT_PATH/pagerank_reducer.py $L_INPUT_DIR $L_OUTPUT_DIR <dumping_factor> <tolerance>"

With pagerank_launcher.py
“pagerank_launcher.sh” is a bash script that allows you to:

  1. - set initial parameters
  2. - execute automatically mapper/reducer until convergence
  1. $ ./pagerank_launcher.sh

Output data

  • stdout du reducer sur hdfs: $HDFS_OUTPUT_DIR/part-00000 :
    • not converged yet: nothing displayed
    • @ convergence: “P(s) converged!” will be printed
    • @ convergence: display top 20 highest pageranks
  • $L_OUTPUT_DIR/ps.txt:
    • display on each line P(s) from s=1 (line 1) until last computed step
    • file is cleared when P converged
  • $L_OUTPUT_DIR/pagerank.txt:
    • after convergence, display ordered list of pagerank for all nodes
    • each line as: ,,()\tPagerank[node]

Contact

Any feedback welcomed! Thanks for reading.