##############
##PAGE RANK SINGLE ITERATION 
##############

## MAPPER 
#!/usr/bin/env python

import sys

# Assume N is known or passed as a global variable.
# For this example, let's hardcode N = 3 based on input.txt
N = 3

node_pageranks = {}
node_adjacencies = {}

# First pass to parse all lines and store information
for line in sys.stdin:
    line = line.strip()
    
    try:
        # Skip the first line (number of nodes) in this simplified example
        if line.isdigit():
            continue
    except ValueError:
        pass

    parts = line.split('\t')
    node_id = parts[0]
    
    if len(parts) > 1:
        if ',' in parts[1]: # Adjacency list
            node_adjacencies[node_id] = parts[1].split(',')
        else: # PageRank value
            node_pageranks[node_id] = float(parts[1])

# Now, iterate through the stored data to emit for the reducer
for node_id in node_adjacencies:
    # Emit the adjacency list itself so the reducer can use it for the next iteration
    print '%s\tADJ_LIST:%s' % (node_id, ','.join(node_adjacencies[node_id]))
    
    # Emit the current PageRank of the node (for damping factor calculation by reducer)
    if node_id in node_pageranks:
        print '%s\tPR:%f' % (node_id, node_pageranks[node_id])
    
    # Emit contributions to neighbors
    if node_id in node_pageranks and node_adjacencies[node_id]:
        pr_value = node_pageranks[node_id]
        num_neighbors = len(node_adjacencies[node_id])
        contribution_per_neighbor = pr_value / num_neighbors
        
        for neighbor in node_adjacencies[node_id]:
            print '%s\t%f' % (neighbor, contribution_per_neighbor)

### REDUCER
#!/usr/bin/env python

import sys

# Damping factor
D = 0.85
N = 3 # Number of nodes - normally determined from the input or passed as a parameter

current_node = None
current_pr_value = 0.0 # PageRank from the previous iteration for this node
contributions_sum = 0.0 # Sum of contributions from other nodes
outgoing_adj_list = [] # Adjacency list for this node

for line in sys.stdin:
    line = line.strip()
    node_id, value = line.split('\t', 1)

    if current_node != node_id:
        if current_node:
            # Calculate and emit PageRank for the previous node
            # The (1-D)/N term is the "random jump" probability
            new_pagerank = (1 - D) / N + D * contributions_sum
            print '%s\t%f' % (current_node, new_pagerank)
            
            # For the next iteration, we'd also need to pass the graph structure
            # (outgoing_adj_list) if it's not pre-loaded.
            # In this single-iteration output, we just print the new PR.

        current_node = node_id
        current_pr_value = 0.0
        contributions_sum = 0.0
        outgoing_adj_list = []
    
    # Process the value
    if value.startswith("ADJ_LIST:"):
        outgoing_adj_list = value[len("ADJ_LIST:"):].split(',')
    elif value.startswith("PR:"):
        current_pr_value = float(value[len("PR:"):])
    else:
        # This is a contribution from another node
        contributions_sum += float(value)

# Handle the last node after the loop
if current_node:
    new_pagerank = (1 - D) / N + D * contributions_sum
    print '%s\t%f' % (current_node, new_pagerank)


## INPUT.TXT 

3
A       B,C
B       A
C       A,B
A       0.3333333333
B       0.3333333333
C       0.3333333333


## instructions for giving input from graph:


Note:

this code implements single iteration


For simplicity, let's assume a graph with 3 nodes (A, B, C) and an initial PageRank of 1/3 for each.

input:

3
A       B,C
B       A
C       A,B
A       0.3333333333
B       0.3333333333
C       0.3333333333

Explanation of input.txt:
Node A links to B and C.
Node B links to A.
Node C links to A and B.
Initial PageRank for A, B, and C is approximately 0.3333333333.

## COMMAND

cd ~
mkdir PageRank
cd PageRank


chmod +x mapper.py reducer.py
dos2unix mapper.py reducer.py input.txt

cat input.txt | ./mapper.py | sort | ./reducer.py


Put input file into HDFS

hdfs dfs -mkdir -p /user/cloudera/pagerank_input
hdfs dfs -put -f input.txt /user/cloudera/pagerank_input/


hdfs dfs -ls /user/cloudera/pagerank_input


hdfs dfs -rm -r -f /user/cloudera/pagerank_output

hdfs dfs -rm -r -f /user/cloudera/pagerank_output

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -files mapper.py,reducer.py \
  -mapper "python mapper.py" \
  -reducer "python reducer.py" \
  -input /user/cloudera/pagerank_input/input.txt \
  -output /user/cloudera/pagerank_output \
  -jobconf mapreduce.job.name="PageRank_SingleIteration" \
  -D mapreduce.job.reduces=1 \
  -cmdenv PYTHONIOENCODING=utf8


hdfs dfs -ls /user/cloudera/pagerank_output

hdfs dfs -cat /user/cloudera/pagerank_output/part-00000

hdfs dfs -getmerge /user/cloudera/pagerank_output pagerank_result.txt
cat pagerank_result.txt
