Difference Markov Decision Process between Google PageRanker
What's the difference between Markov Decision Processes (MDP) and Google PageRanker?
Code From Scratch Example
Input
import numpy as np
web_graph = {
# Home links to all others (hub)
"Home": ["Search", "News", "Blog", "Search"], # Search appears twice
# Search favors News, occasionally goes back to Home
"Search": ["News", "News", "Home"],
# News mostly links to Blog, but also to Home and Search
"News": ["Blog", "Blog", "Home", "Search"],
# Blog loops on itself and sometimes goes back to Home
"Blog": ["Blog", "Home", "Blog"],
}
class Ranker:
""" Google PageRanker.
Build the Google matrix M of size N x N:
M[i, j] = probability of transitioning from page j to page i
(column-stochastic form).
i = outlinks
j = inlinks
Runs by cols to ensure column-stochastic form.
"""
def __init__(self, graph: dict, damping_factor: float = 0.85, epsilon: float = 1e-10):
self.d = damping_factor
self.epsilon = epsilon
self.reset(graph)
def reset(self, graph: dict):
if not hasattr(self, "nodes"):
self._nodes = set()
if not hasattr(self, "graph"):
self.graph = {}
for node, outlinks in graph.items():
for outlink in outlinks:
if outlink not in self.graph:
self.graph.setdefault(node, []).append(outlink)
if node not in self._nodes:
self._nodes.add(node)
self.N = len(self._nodes)
self.tag2idx = {tag: idx for idx, tag in enumerate(self.nodes)}
self.idx2tag = {idx: tag for idx, tag in enumerate(self.nodes)}
if not hasattr(self, 'adj'):
self.adj = np.zeros((self.N, self.N))
@property
def M(self):
""" Return the Google matrix M. """
M = np.zeros_like(self.adj)
for j in range(self.N):
col = self.adj[:, j].copy()
col_sum = np.sum(col)
if col_sum == 0:
# dangling node: set self-loop to 1, rest to 0
M[:, j] = 0
M[j, j] = 1.0
elif col_sum != 1:
# normalize column to sum to 1
M[:, j] = col / col_sum
return M
def pageRank(self, max_iter: int | None = None):
""" Compute PageRank vector using power iteration method. """
M_hat = self.d * self.M
w = np.ones(self.N) / self.N
v = M_hat @ w + self.teleport
c = 0
while np.linalg.norm(w - v) >= self.epsilon:
w = v
v = M_hat @ w + self.teleport
if max_iter is not None:
max_iter -= 1
if max_iter <= 0:
print("Reached maximum iterations. M never converged. Final norm:", np.linalg.norm(w - v), "Output", v)
return v
c += 1
print(f"\n{'*'*10} Converged after {c} iterations {'*'*10}\n")
return v
@property
def nodes(self):
return sorted(self._nodes, reverse=False)
@property
def teleport(self):
return (1.0 - self.d) / self.N
def update(self, graph: dict):
""" Update adjacency matrix based on new graph structure.
Build the Google matrix M of size N x N:
M[i, j] = probability of transitioning from page j to page i
(column-stochastic form).
"""
self.reset(graph)
missed = {}
for key, outlinks in graph.items():
i = self.tag2idx.get(key, None)
if i is None:
print(f"Key {key} not found in tag2idx.")
missed[key] = outlinks
else:
for outlink in outlinks:
j = self.tag2idx.get(outlink, None)
self.adj[j, i] += 1.0
#print(f"New adjacency matrix after update: {self.adj}")
print(f"Missed entries during update: {missed}")
return missed
def predict(self, current: str):
"""
Given a current node, return probabilities of going to all nodes
in one step according to the transition matrix.
"""
if current not in self.tag2idx:
print("Queried key not in graph:", current)
return None
j = self.tag2idx[current]
outlinks = self.M[:, j].copy()
prob = {self.idx2tag[j]: outlinks[j] for j in range(len(outlinks))}
return sorted(prob.items(), key=lambda x: x[1], reverse=True)
model = Ranker(web_graph)
model.update(web_graph)
print(model.predict("Search"))
# print(model.predictOnPowerIteration("Search"))
model.update({"Search": ["Blog", "Blog", "Home", "Home", "News"]})
print(model.predict("Search"))
print(model.pageRank())Output

Last updated
Was this helpful?

