################################################################################
# 13. Implement clustering with Wine Dataset using K-means and K-medoid
################################################################################

# !pip install pyspark pandas numpy matplotlib seaborn scikit-learn scikit-learn-extra

import warnings
warnings.filterwarnings("ignore")

# For notebook plotting
%matplotlib inline

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler as SparkStandardScaler
from pyspark.ml.clustering import KMeans as SparkKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, adjusted_rand_score, normalized_mutual_info_score
from sklearn.cluster import KMeans
from sklearn_extra.cluster import KMedoids
from sklearn.decomposition import PCA

sns.set(style="whitegrid")
RANDOM_STATE = 42

# ---------------------------
# Initialize SparkSession
# ---------------------------
spark = SparkSession.builder.master("local[*]").appName("WineClusteringCombined").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# ---------------------------
# Load and Explore the Wine Dataset
# ---------------------------
wine = datasets.load_wine()
X_np = wine.data
y_np = wine.target
feature_names = wine.feature_names
target_names = list(wine.target_names)

X_pd = pd.DataFrame(X_np, columns=feature_names)
y_pd = pd.Series(y_np, name="target")

df_eda = X_pd.copy()
df_eda['wine_class'] = y_pd.map({i: name for i, name in enumerate(target_names)})

print("--- Exploratory Data Analysis (EDA) ---")
print("\nFirst 5 rows:")
display(df_eda.head())

print("\nDescriptive statistics:")
display(X_pd.describe())

print("\nClass distribution:")
display(df_eda['wine_class'].value_counts())

# Correlation Heatmap
plt.figure(figsize=(12, 10))
sns.heatmap(X_pd.corr(), annot=True, cmap='coolwarm', fmt=".2f")
plt.title("Correlation Matrix of Wine Features")
plt.show()

# ---------------------------
# Data Preparation
# ---------------------------
# Scikit-learn scaling
scaler_sklearn = StandardScaler()
X_scaled_sklearn = scaler_sklearn.fit_transform(X_pd)
print("Scikit-learn scaled data shape:", X_scaled_sklearn.shape)

# PySpark DataFrame preparation and scaling
pdf_full = X_pd.copy()
pdf_full["label"] = y_np
sdf_full = spark.createDataFrame(pdf_full)

assembler = VectorAssembler(inputCols=feature_names, outputCol="features_raw")
sdf_assembled = assembler.transform(sdf_full)

scaler_spark = SparkStandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)
sdf_scaled = scaler_spark.fit(sdf_assembled).transform(sdf_assembled)

final_data_spark = sdf_scaled.select("features", "label")
print("PySpark data count:", final_data_spark.count())
final_data_spark.show(5, truncate=False)

# ---------------------------
# Model Training & Evaluation helpers
# ---------------------------
def evaluate_sklearn_clustering(name, labels_pred, data, labels_true):
    silhouette = silhouette_score(data, labels_pred)
    ari = adjusted_rand_score(labels_true, labels_pred)
    nmi = normalized_mutual_info_score(labels_true, labels_pred)
    print(f"\n--- {name} (Scikit-learn) ---")
    print(f"Silhouette Score: {silhouette:.4f}")
    print(f"Adjusted Rand Index (ARI): {ari:.4f}")
    print(f"Normalized Mutual Information (NMI): {nmi:.4f}")
    return {'silhouette': silhouette, 'ari': ari, 'nmi': nmi}

def evaluate_pyspark_clustering(name, estimator, data_sdf, labels_true_pd):
    # Fit the Spark estimator and compute Spark silhouette (uses feature vector column and predictionCol)
    model = estimator.fit(data_sdf)
    predictions = model.transform(data_sdf)
    evaluator = ClusteringEvaluator(featuresCol='features', predictionCol='prediction', metricName='silhouette')
    silhouette = evaluator.evaluate(predictions)
    preds_pd = predictions.select("prediction", "label").toPandas()
    ari = adjusted_rand_score(labels_true_pd, preds_pd['prediction'])
    nmi = normalized_mutual_info_score(labels_true_pd, preds_pd['prediction'])
    print(f"\n--- {name} (PySpark) ---")
    print(f"Silhouette Score (Spark evaluator): {silhouette:.4f}")
    print(f"Adjusted Rand Index (ARI): {ari:.4f}")
    print(f"Normalized Mutual Information (NMI): {nmi:.4f}")
    return {'silhouette': silhouette, 'ari': ari, 'nmi': nmi}

# ---------------------------
# 1) Elbow method (scikit-learn KMeans) to confirm k
# ---------------------------
print("\n--- Elbow Method (scikit-learn KMeans) ---")
inertia = []
k_range = range(1, 11)
for k in k_range:
    km = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init=10)
    km.fit(X_scaled_sklearn)
    inertia.append(km.inertia_)

plt.figure(figsize=(9, 5))
plt.plot(k_range, inertia, marker='o', linestyle='--')
plt.xlabel('Number of clusters (k)')
plt.ylabel('Inertia')
plt.title('Elbow Method for Optimal k (Wine dataset)')
plt.xticks(k_range)
plt.grid(True)
plt.show()
print("Elbow usually suggests k=3 for this dataset (as expected).")

# ---------------------------
# 2) Scikit-learn: KMeans and KMedoids (k=3)
# ---------------------------
print("\n--- Scikit-learn clustering (k=3) ---")
k = 3

# K-Means
kmeans_sklearn = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init=10)
kmeans_labels_sklearn = kmeans_sklearn.fit_predict(X_scaled_sklearn)
kmeans_scores_sklearn = evaluate_sklearn_clustering("K-Means", kmeans_labels_sklearn, X_scaled_sklearn, y_np)

# K-Medoids
kmedoids_sklearn = KMedoids(n_clusters=k, random_state=RANDOM_STATE, method='pam', init='k-medoids++')
kmedoids_labels_sklearn = kmedoids_sklearn.fit_predict(X_scaled_sklearn)
kmedoids_scores_sklearn = evaluate_sklearn_clustering("K-Medoids", kmedoids_labels_sklearn, X_scaled_sklearn, y_np)

# ---------------------------
# 3) PySpark K-Means (k=3)
# ---------------------------
print("\n--- PySpark K-Means (k=3) ---")
kmeans_spark = SparkKMeans(k=k, seed=RANDOM_STATE, featuresCol="features", predictionCol="prediction")
kmeans_scores_spark = evaluate_pyspark_clustering("K-Means", kmeans_spark, final_data_spark, y_np)
print("Note: K-Medoids is not part of PySpark MLlib standard API; use scikit-learn-extra for medoids.")

# ---------------------------
# 4) Visualization using PCA (2D)
# ---------------------------
pca = PCA(n_components=2, random_state=RANDOM_STATE)
X_pca = pca.fit_transform(X_scaled_sklearn)

df_pca = pd.DataFrame(X_pca, columns=['PCA1', 'PCA2'])
df_pca['True Class'] = y_pd.map({i: name for i, name in enumerate(target_names)})
df_pca['K-Means Labels'] = kmeans_labels_sklearn
df_pca['K-Medoids Labels'] = kmedoids_labels_sklearn

fig, axes = plt.subplots(1, 3, figsize=(24, 6), sharex=True, sharey=True)

sns.scatterplot(ax=axes[0], data=df_pca, x='PCA1', y='PCA2', hue='True Class', palette='tab10', s=70, alpha=0.85)
axes[0].set_title('Ground Truth')

sns.scatterplot(ax=axes[1], data=df_pca, x='PCA1', y='PCA2', hue='K-Means Labels', palette='tab10', s=70, alpha=0.85)
axes[1].set_title('Scikit-learn K-Means')

sns.scatterplot(ax=axes[2], data=df_pca, x='PCA1', y='PCA2', hue='K-Medoids Labels', palette='tab10', s=70, alpha=0.85)
axes[2].set_title('Scikit-learn K-Medoids')

plt.suptitle("Wine Dataset Clustering (PCA projection)", fontsize=16)
plt.show()

# ---------------------------
# 5) Cluster Centroid / Medoid Analysis
# ---------------------------
print("\n--- K-Means centroids (back to original feature scale) ---")
kmeans_centers_original = scaler_sklearn.inverse_transform(kmeans_sklearn.cluster_centers_)
df_centers = pd.DataFrame(kmeans_centers_original, columns=feature_names)
display(df_centers)

print("\n--- K-Medoids medoids (actual data points in original feature scale) ---")
# KMedoids has attribute medoid_indices_ that points to selected medoid rows in the input data
medoid_indices = kmedoids_sklearn.medoid_indices_
# medoid_indices_ may be a numpy array of indices relative to the input data used during fit
medoids_original = scaler_sklearn.inverse_transform(X_scaled_sklearn[medoid_indices])
df_medoids = pd.DataFrame(medoids_original, columns=feature_names)
display(df_medoids)
print("(Each medoid row corresponds to an actual data point from the dataset.)")

# ---------------------------
# 6) Summary of scores
# ---------------------------
summary_df = pd.DataFrame({
    'Scikit-learn K-Means': kmeans_scores_sklearn,
    'Scikit-learn K-Medoids': kmedoids_scores_sklearn,
    'PySpark K-Means': kmeans_scores_spark
}).T
print("\n--- Summary of Clustering Evaluation Scores (k=3) ---")
display(summary_df)

# ---------------------------
# 7) Clean up Spark
# ---------------------------
spark.stop()
print("\nSpark session stopped. All done.")



################################################################################
# K-Means and K-Medoids Clustering on CSV Dataset (PySpark + scikit-learn)
################################################################################

# ---------------------------
# (Optional) Install dependencies (uncomment if needed)
# ---------------------------
# !pip install pyspark pandas numpy matplotlib seaborn scikit-learn scikit-learn-extra

# ---------------------------
# Imports
# ---------------------------
import warnings
warnings.filterwarnings("ignore")

%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler as SparkStandardScaler
from pyspark.ml.clustering import KMeans as SparkKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, adjusted_rand_score, normalized_mutual_info_score
from sklearn.cluster import KMeans
from sklearn_extra.cluster import KMedoids
from sklearn.decomposition import PCA

sns.set(style="whitegrid")
RANDOM_STATE = 42

################################################################################
# 1. Initialize Spark
################################################################################
spark = SparkSession.builder.master("local[*]").appName("CSVClusteringCombined").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

################################################################################
# 2. Load CSV Dataset
################################################################################
#  Replace 'Dataset.csv' with your dataset file path (must be in the same folder or give full path)
csv_path = "Dataset.csv"

# Load using pandas for local analysis
df_pd = pd.read_csv(csv_path)
print("Dataset loaded successfully!")

print("\n--- Dataset Info ---")
print(df_pd.info())

print("\nFirst 5 rows:")
display(df_pd.head())

################################################################################
# 3. Basic Data Cleaning
################################################################################
# Drop rows with missing values (if any)
df_pd = df_pd.dropna()

# Encode categorical columns if any
for col in df_pd.select_dtypes(include=['object']).columns:
    df_pd[col] = df_pd[col].astype('category').cat.codes

print("\n--- After Cleaning ---")
display(df_pd.head())

################################################################################
# 4. Prepare Data for Clustering
################################################################################
X_pd = df_pd.values
scaler_sklearn = StandardScaler()
X_scaled_sklearn = scaler_sklearn.fit_transform(X_pd)
print("Scaled data shape:", X_scaled_sklearn.shape)

# PySpark DataFrame
sdf_full = spark.createDataFrame(df_pd)
feature_cols = df_pd.columns

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
sdf_assembled = assembler.transform(sdf_full)

scaler_spark = SparkStandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)
sdf_scaled = scaler_spark.fit(sdf_assembled).transform(sdf_assembled)
final_data_spark = sdf_scaled.select("features")

################################################################################
# 5. Elbow Method (scikit-learn KMeans)
################################################################################
inertia = []
k_range = range(1, 11)
for k in k_range:
    km = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init=10)
    km.fit(X_scaled_sklearn)
    inertia.append(km.inertia_)

plt.figure(figsize=(9, 5))
plt.plot(k_range, inertia, marker='o', linestyle='--')
plt.xlabel('Number of clusters (k)')
plt.ylabel('Inertia')
plt.title('Elbow Method for Optimal k (CSV Dataset)')
plt.grid(True)
plt.show()

print("Based on the elbow method, select an appropriate k (usually 2–5).")

################################################################################
# 6. Train Models (KMeans & KMedoids)
################################################################################
k = 3  # Change based on your dataset
print(f"\n--- Training Clustering Models (k={k}) ---")

# Scikit-learn KMeans
kmeans_sklearn = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init=10)
kmeans_labels = kmeans_sklearn.fit_predict(X_scaled_sklearn)

# Scikit-learn KMedoids
kmedoids_sklearn = KMedoids(n_clusters=k, random_state=RANDOM_STATE, method='pam', init='k-medoids++')
kmedoids_labels = kmedoids_sklearn.fit_predict(X_scaled_sklearn)

################################################################################
# 7. PySpark KMeans
################################################################################
kmeans_spark = SparkKMeans(k=k, seed=RANDOM_STATE, featuresCol="features", predictionCol="prediction")
kmeans_model_spark = kmeans_spark.fit(final_data_spark)
predictions_spark = kmeans_model_spark.transform(final_data_spark)

################################################################################
# 8. Evaluation Metrics
################################################################################
def evaluate_clustering(name, labels_pred, data):
    sil = silhouette_score(data, labels_pred)
    print(f"{name} Silhouette Score: {sil:.4f}")
    return sil

print("\n--- Clustering Evaluation ---")
sil_kmeans = evaluate_clustering("Scikit-learn KMeans", kmeans_labels, X_scaled_sklearn)
sil_kmedoids = evaluate_clustering("Scikit-learn KMedoids", kmedoids_labels, X_scaled_sklearn)

evaluator = ClusteringEvaluator(featuresCol='features', predictionCol='prediction', metricName='silhouette')
sil_spark = evaluator.evaluate(predictions_spark)
print(f"PySpark KMeans Silhouette Score: {sil_spark:.4f}")

################################################################################
# 9. Visualization (PCA)
################################################################################
pca = PCA(n_components=2, random_state=RANDOM_STATE)
X_pca = pca.fit_transform(X_scaled_sklearn)
df_vis = pd.DataFrame(X_pca, columns=['PCA1', 'PCA2'])
df_vis['KMeans'] = kmeans_labels
df_vis['KMedoids'] = kmedoids_labels

fig, axes = plt.subplots(1, 2, figsize=(14, 6))
sns.scatterplot(ax=axes[0], data=df_vis, x='PCA1', y='PCA2', hue='KMeans', palette='tab10', s=60)
axes[0].set_title('Scikit-learn KMeans Clusters')

sns.scatterplot(ax=axes[1], data=df_vis, x='PCA1', y='PCA2', hue='KMedoids', palette='tab10', s=60)
axes[1].set_title('Scikit-learn KMedoids Clusters')
plt.show()

################################################################################
# 10. Cluster Centers & Medoids
################################################################################
print("\n--- Cluster Centers (KMeans) ---")
df_centers = pd.DataFrame(scaler_sklearn.inverse_transform(kmeans_sklearn.cluster_centers_), columns=feature_cols)
display(df_centers)

print("\n--- Medoids (KMedoids) ---")
medoid_indices = kmedoids_sklearn.medoid_indices_
df_medoids = pd.DataFrame(scaler_sklearn.inverse_transform(X_scaled_sklearn[medoid_indices]), columns=feature_cols)
display(df_medoids)

################################################################################
# 11. Summary
################################################################################
summary = pd.DataFrame({
    'Model': ['Scikit-learn KMeans', 'Scikit-learn KMedoids', 'PySpark KMeans'],
    'Silhouette Score': [sil_kmeans, sil_kmedoids, sil_spark]
})
display(summary)

################################################################################
# 12. Cleanup
################################################################################
spark.stop()
print("\nSpark session stopped. Tutorial completed successfully!")
