===================================================
## IO. Implement the collaborative filtering system using PySpark
===================================================

################################################################################
# Cell 1: Import libraries and start Spark
################################################################################

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.linalg import Vectors
import math

# Start Spark session
spark = SparkSession.builder \
    .appName("CollaborativeFilteringTutorial") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

print("✅ Spark Session Started")

################################################################################
# Cell 2: Create local dataset (User-Item-Rating)
################################################################################

# We'll use a small, static dataset to illustrate collaborative filtering.
# Each row = (user_id, item_id, rating)

data = [
    (1, 101, 5), (1, 102, 3), (1, 103, 4),
    (2, 101, 4), (2, 102, 5), (2, 104, 2),
    (3, 101, 2), (3, 103, 5), (3, 104, 4),
    (4, 102, 4), (4, 103, 3), (4, 104, 5),
    (5, 101, 5), (5, 104, 4)
]

ratings_df = spark.createDataFrame(data, ["user_id", "item_id", "rating"])
ratings_df.show()

################################################################################
# Cell 3: Prepare Data for Similarity Calculation
################################################################################

# We'll treat ratings >= 4 as 'liked' items for Jaccard similarity.
interaction_threshold = 4
interactions_df = ratings_df.withColumn('interaction', F.when(F.col('rating') >= interaction_threshold, 1).otherwise(0))

# Keep only positive interactions (liked items)
pos_interactions = interactions_df.filter(F.col('interaction') == 1).select('user_id', 'item_id')
pos_interactions.show()

# Collect liked items per user
user_items = pos_interactions.groupBy('user_id').agg(F.collect_set('item_id').alias('items'))
user_items.show(truncate=False)

################################################################################
# Cell 4: Implement Jaccard Similarity with PySpark
################################################################################

# Jaccard = |Intersection(A,B)| / |Union(A,B)| for user-item sets A and B

# Step 1: item -> users mapping
item_users = pos_interactions.groupBy('item_id').agg(F.collect_set('user_id').alias('users'))

# Step 2: Generate user pairs who share at least one item
pairs = item_users.select('item_id', F.explode('users').alias('user_a')) \
    .join(item_users.select('item_id', F.explode('users').alias('user_b')), on='item_id') \
    .filter(F.col('user_a') < F.col('user_b')) \
    .groupBy('user_a', 'user_b') \
    .agg(F.countDistinct('item_id').alias('intersection'))

# Step 3: Add union and compute Jaccard
user_sizes = user_items.withColumn('n_items', F.size('items')).select('user_id','n_items')
ua = user_sizes.withColumnRenamed('user_id','user_a').withColumnRenamed('n_items','n_a')
ub = user_sizes.withColumnRenamed('user_id','user_b').withColumnRenamed('n_items','n_b')

pairs = pairs.join(ua, on='user_a').join(ub, on='user_b')
pairs = pairs.withColumn('union', F.col('n_a') + F.col('n_b') - F.col('intersection')) \
             .withColumn('jaccard', F.col('intersection') / F.col('union'))

print("\n🔹 User-User Jaccard Similarity:")
pairs.select('user_a','user_b','intersection','union','jaccard').show(truncate=False)

################################################################################
# Cell 5: Implement Cosine Similarity with PySpark
################################################################################

# Cosine = (A·B) / (||A|| * ||B||) using explicit rating values

# Step 1: Compute dot products for all user pairs who rated the same item
pairs_dot = ratings_df.alias('r1') \
    .join(ratings_df.alias('r2'), on='item_id') \
    .filter(F.col('r1.user_id') < F.col('r2.user_id')) \
    .select(F.col('r1.user_id').alias('user_a'), F.col('r2.user_id').alias('user_b'), (F.col('r1.rating') * F.col('r2.rating')).alias('dot_part')) \
    .groupBy('user_a','user_b').agg(F.sum('dot_part').alias('dot'))

# Step 2: Compute user norms
norms = ratings_df.groupBy('user_id').agg(F.sqrt(F.sum(F.pow('rating',2))).alias('norm'))
na = norms.withColumnRenamed('user_id','user_a').withColumnRenamed('norm','norm_a')
nb = norms.withColumnRenamed('user_id','user_b').withColumnRenamed('norm','norm_b')

# Step 3: Join and compute cosine
cosine = pairs_dot.join(na, on='user_a').join(nb, on='user_b') \
    .withColumn('cosine', F.col('dot') / (F.col('norm_a') * F.col('norm_b')))

print("\n🔹 User-User Cosine Similarity:")
cosine.select('user_a','user_b','dot','cosine').show(truncate=False)

################################################################################
# Cell 6: Generate Simple Recommendations
################################################################################

def recommend_for_user(target_user, similarity_df, k=2, n=3, metric='cosine'):
    sim_col = metric
    sims = similarity_df.filter((F.col('user_a') == target_user) | (F.col('user_b') == target_user))
    sims = sims.withColumn('neighbor', F.when(F.col('user_a') == target_user, F.col('user_b')).otherwise(F.col('user_a')))
    sims = sims.select('neighbor', sim_col).orderBy(F.col(sim_col).desc()).limit(k)

    seen_items = pos_interactions.filter(F.col('user_id') == target_user).select('item_id').distinct()

    neighbor_items = pos_interactions.alias('pi').join(sims.alias('t'), F.col('pi.user_id') == F.col('t.neighbor')) \
                      .select(F.col('pi.item_id'), F.col('t.' + sim_col).alias('weight')) \
                      .groupBy('item_id').agg(F.sum('weight').alias('score'))

    recs = neighbor_items.join(seen_items, on='item_id', how='left_anti').orderBy(F.col('score').desc()).limit(n)
    return recs

# Example Recommendation
example_user = 1
recs = recommend_for_user(example_user, cosine.withColumnRenamed('cosine','cosine'), k=2, n=3)

print(f"\n Recommended items for User {example_user}:")
recs.show()

################################################################################
# Cell 7: Cleanup
################################################################################

spark.stop()
print("\n Spark Session Stopped. Tutorial Complete.")



################################################################################
## 2 built-in dataset 
################################################################################

PySpark comes with a small built-in dataset sample_movielens_ratings.txt under pyspark.ml.example.
Example:

from pyspark.ml.recommendation import ALS, Rating
path = "data/mllib/als/sample_movielens_ratings.txt"
ratings = spark.read.text(path)


################################################################################
# 3: Load Dataset.csv (local file)
################################################################################

# Make sure Dataset.csv is in the same directory as your notebook
# Example format: user_id,item_id,rating
dataset_path = "Dataset.csv"

# Load CSV file
ratings_df = spark.read.csv(dataset_path, header=True, inferSchema=True)

# If column names differ, rename them accordingly
ratings_df = ratings_df.select(
    F.col("user_id").cast(IntegerType()),
    F.col("item_id").cast(IntegerType()),
    F.col("rating").cast(FloatType())
)

print("✅ Loaded dataset from Dataset.csv")
ratings_df.show(5