
################################################################################
# 11. Perform the Logistic regression classification,
using PySpark and display the result with graph and compare the accuracy of an
algorithms using Precision, Recall and F-Measure. 

################################################################################
import sys
import importlib.util
import os

def check_and_install_libraries(packages):
    """
    Checks if a list of libraries is installed and installs them if they are not.
    """
    for package in packages:
        package_name = package.split('==')[0]
        spec = importlib.util.find_spec(package_name)
        if spec is None:
            print(f"{package} not found, installing...")
            try:
                # Using {sys.executable} to ensure pip is installed for the correct Python interpreter
                !{sys.executable} -m pip install -q {package}
                print(f"{package} installed successfully.")
            except Exception as e:
                print(f"Error installing {package}: {e}")
        else:
            print(f"{package_name} is already installed.")

# List of required libraries
required_packages = ['pyspark', 'pandas', 'numpy', 'matplotlib', 'seaborn', 'scikit-learn']
check_and_install_libraries(required_packages)


### 2. Import Libraries
# Import all necessary modules for both Scikit-learn and PySpark analysis.

# --- FIX: Import classes with conflicting names using aliases ---
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.feature import OneHotEncoder as SparkOneHotEncoder # Aliased import
from pyspark.ml.feature import StandardScaler as SparkStandardScaler # Aliased import
from pyspark.ml.classification import LogisticRegression as SparkLogisticRegression
from pyspark.ml.regression import LinearRegression as SparkLinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder as SklearnOneHotEncoder # Aliased import
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline as SklearnPipeline
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, mean_squared_error, r2_score
from sklearn.decomposition import PCA
# -----------------------------------------------------------------


# --- ADD THIS LINE TO DISPLAY GRAPHS IN NOTEBOOKS ---
%matplotlib inline
# -------------------------------------------

# Set global settings for plots and random state for reproducibility
sns.set(style="whitegrid")
RANDOM_STATE = 42


### 3. Initialize SparkSession
# Create a SparkSession, which is the entry point to any Spark functionality.
spark = SparkSession.builder.master("local[*]").appName("TitanicRegressionCombined").getOrCreate()
spark.sparkContext.setLogLevel("WARN")


### 4. Load and Explore the Titanic Dataset
# Load the Titanic dataset from a CSV file and perform basic Exploratory Data Analysis (EDA).
try:
    df_pd = pd.read_csv("/content/titanic.csv")
except FileNotFoundError:
    print("FATAL: titanic.csv not found.")
    print("Please ensure the 'titanic.csv' file is in the same directory as this script.")
    # Stop the script if the file is not found
    spark.stop()
    sys.exit()


print("--- Exploratory Data Analysis (EDA) ---")
print("\nFirst 5 rows of the dataset:")
display(df_pd.head())

print("\nDataset Info:")
df_pd.info()

print("\nDescriptive Statistics:")
display(df_pd.describe())

print("\nSurvival Distribution:")
display(df_pd['Survived'].value_counts(normalize=True))

# Correlation Heatmap for numerical features
print("\nNumerical Feature Correlation Heatmap:")
plt.figure(figsize=(12, 10))
# Select only numeric columns for correlation
numeric_cols = df_pd.select_dtypes(include=np.number).columns
sns.heatmap(df_pd[numeric_cols].corr(), annot=True, cmap='coolwarm', fmt=".2f")
plt.title("Correlation Matrix of Numerical Titanic Features")
plt.show()


### 5. Data Preparation for Machine Learning
# Prepare the data for both Scikit-learn and PySpark.
# This includes handling missing values, encoding categorical features, and feature scaling.

#### 5.1 Scikit-learn Data Preparation

# --- Logistic Regression ---
# Drop columns that are not useful for prediction
df_sklearn_logistic = df_pd.drop(['PassengerId', 'Name', 'Ticket', 'Cabin'], axis=1)

# Separate target variable
X_logistic = df_sklearn_logistic.drop('Survived', axis=1)
y_logistic = df_sklearn_logistic['Survived']

# Identify categorical and numerical features
categorical_features = ['Embarked', 'Sex', 'Pclass']
numerical_features = ['Age', 'SibSp', 'Parch', 'Fare']

# Create preprocessing pipelines for numerical and categorical features
numerical_transformer = SklearnPipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_transformer = SklearnPipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    # --- FIX: Use the aliased SklearnOneHotEncoder ---
    ('onehot', SklearnOneHotEncoder(handle_unknown='ignore'))
])

# Create a preprocessor object using ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough' # Keep other columns (if any)
)

# Split data into training and testing sets
X_train_logistic, X_test_logistic, y_train_logistic, y_test_logistic = train_test_split(X_logistic, y_logistic, test_size=0.2, random_state=RANDOM_STATE)

# Apply preprocessing
X_train_logistic_prepared = preprocessor.fit_transform(X_train_logistic)
X_test_logistic_prepared = preprocessor.transform(X_test_logistic)

print("Scikit-learn logistic regression training data shape (prepared):", X_train_logistic_prepared.shape)


# --- Linear Regression ---
df_sklearn_linear = df_pd.drop(['PassengerId', 'Name', 'Ticket', 'Cabin', 'Survived'], axis=1)

# Separate target variable
X_linear = df_sklearn_linear.drop('Fare', axis=1)
y_linear = df_sklearn_linear['Fare']

# Identify categorical and numerical features
numerical_features_linear = ['Age', 'SibSp', 'Parch']

# Create a preprocessor object using ColumnTransformer
preprocessor_linear = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features_linear),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough'
)

# Split data into training and testing sets
X_train_linear, X_test_linear, y_train_linear, y_test_linear = train_test_split(X_linear, y_linear, test_size=0.2, random_state=RANDOM_STATE)

# Apply preprocessing
X_train_linear_prepared = preprocessor_linear.fit_transform(X_train_linear)
X_test_linear_prepared = preprocessor_linear.transform(X_test_linear)

print("Scikit-learn linear regression training data shape (prepared):", X_train_linear_prepared.shape)


#### 5.2 PySpark Data Preparation
# Drop columns that are not useful for prediction and create Spark DataFrame
df_spark = spark.createDataFrame(df_pd.drop(['PassengerId', 'Name', 'Ticket', 'Cabin'], axis=1))

# Fill missing values for key columns
df_spark = df_spark.na.fill({'Age': df_pd['Age'].median()})
# Determine the most frequent 'Embarked' value and fill missing values with it
most_frequent_embarked = df_pd['Embarked'].mode()[0]
df_spark = df_spark.na.fill({'Embarked': most_frequent_embarked})

# --- Logistic Regression ---
# Identify categorical and numerical features
categorical_cols = ['Sex', 'Embarked', 'Pclass']
numerical_cols = ['Age', 'SibSp', 'Parch', 'Fare']

# Create a list of stages for the pipeline
stages_logistic = []

# StringIndexer and OneHotEncoder for categorical columns
for col_name in categorical_cols:
    string_indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep")
    # --- FIX: Use the aliased SparkOneHotEncoder ---
    encoder = SparkOneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[col_name + "_vec"])
    stages_logistic += [string_indexer, encoder]

# VectorAssembler for numerical columns
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
stages_logistic += [numerical_assembler]

# StandardScaler for numerical features
scaler = SparkStandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")
stages_logistic += [scaler]

# VectorAssembler to combine all features
assembler_inputs = [c + "_vec" for c in categorical_cols] + ["scaled_numerical_features"]
final_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages_logistic += [final_assembler]

# Create and fit the pipeline
pipeline_logistic = Pipeline(stages=stages_logistic)
pipeline_model_logistic = pipeline_logistic.fit(df_spark)
df_transformed_logistic = pipeline_model_logistic.transform(df_spark)

# Rename 'Survived' to 'label' for PySpark ML
df_transformed_logistic = df_transformed_logistic.withColumnRenamed('Survived', 'label')

# Split the data
train_data_logistic, test_data_logistic = df_transformed_logistic.randomSplit([0.8, 0.2], seed=RANDOM_STATE)

print("\nPySpark logistic regression training data count:", train_data_logistic.count())
train_data_logistic.select("features", "label").show(5, truncate=False)


# --- Linear Regression ---
# Identify categorical and numerical features
numerical_cols_linear = ['Age', 'SibSp', 'Parch']

# Create a list of stages for the pipeline
stages_linear = []

# StringIndexer and OneHotEncoder for categorical columns
for col_name in categorical_cols:
    string_indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep")
    encoder = SparkOneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[col_name + "_vec"])
    stages_linear += [string_indexer, encoder]

# VectorAssembler for numerical columns
numerical_assembler_linear = VectorAssembler(inputCols=numerical_cols_linear, outputCol="numerical_features")
stages_linear += [numerical_assembler_linear]

# StandardScaler for numerical features
scaler_linear = SparkStandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")
stages_linear += [scaler_linear]

# VectorAssembler to combine all features
assembler_inputs_linear = [c + "_vec" for c in categorical_cols] + ["scaled_numerical_features"]
final_assembler_linear = VectorAssembler(inputCols=assembler_inputs_linear, outputCol="features")
stages_linear += [final_assembler_linear]

# Create and fit the pipeline
pipeline_linear = Pipeline(stages=stages_linear)
pipeline_model_linear = pipeline_linear.fit(df_spark)
df_transformed_linear = pipeline_model_linear.transform(df_spark)

# Rename 'Fare' to 'label' for PySpark ML
df_transformed_linear = df_transformed_linear.withColumnRenamed('Fare', 'label')

# Split the data
train_data_linear, test_data_linear = df_transformed_linear.randomSplit([0.8, 0.2], seed=RANDOM_STATE)

print("\nPySpark linear regression training data count:", train_data_linear.count())
train_data_linear.select("features", "label").show(5, truncate=False)


### 6. Model Training and Evaluation
# Build and evaluate models using both frameworks.

#### 6.1 Scikit-learn: Model Training and Evaluation
# --- Logistic Regression ---
# Initialize and train the Logistic Regression classifier
lr_sklearn = LogisticRegression(random_state=RANDOM_STATE)
lr_sklearn.fit(X_train_logistic_prepared, y_train_logistic)

# Make predictions
y_pred_logistic_sklearn = lr_sklearn.predict(X_test_logistic_prepared)

# Evaluate the model
print("\n--- Scikit-learn Logistic Regression Evaluation ---")
accuracy_sklearn_logistic = accuracy_score(y_test_logistic, y_pred_logistic_sklearn)
print(f"Accuracy: {accuracy_sklearn_logistic:.4f}")
print("\nClassification Report:\n", classification_report(y_test_logistic, y_pred_logistic_sklearn))
print("\nConfusion Matrix:\n", confusion_matrix(y_test_logistic, y_pred_logistic_sklearn))

# --- Linear Regression ---
# Initialize and train the Linear Regression model
lin_reg_sklearn = LinearRegression()
lin_reg_sklearn.fit(X_train_linear_prepared, y_train_linear)

# Make predictions
y_pred_linear_sklearn = lin_reg_sklearn.predict(X_test_linear_prepared)

# Evaluate the model
print("\n--- Scikit-learn Linear Regression Evaluation ---")
mse_sklearn = mean_squared_error(y_test_linear, y_pred_linear_sklearn)
r2_sklearn = r2_score(y_test_linear, y_pred_linear_sklearn)
print(f"Mean Squared Error: {mse_sklearn:.4f}")
print(f"R-squared: {r2_sklearn:.4f}")


#### 6.2 PySpark: Model Training and Evaluation
# --- Binomial Logistic Regression ---
# Initialize and train the LogisticRegression model
lr_spark = SparkLogisticRegression(featuresCol='features', labelCol='label')
lr_model_spark = lr_spark.fit(train_data_logistic)

# Make predictions
predictions_logistic = lr_model_spark.transform(test_data_logistic)

# Evaluate the model
evaluator_accuracy = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy_spark_logistic = evaluator_accuracy.evaluate(predictions_logistic)

print("\n--- PySpark Binomial Logistic Regression Evaluation ---")
print(f"Accuracy: {accuracy_spark_logistic:.4f}")

# Show a sample of predictions
print("\nPySpark Logistic Regression Predictions Sample:")
predictions_logistic.select("prediction", "label", "features").show(5)


# --- Linear Regression ---
# Initialize and train the LinearRegression model
lin_reg_spark = SparkLinearRegression(featuresCol='features', labelCol='label')
lin_reg_model_spark = lin_reg_spark.fit(train_data_linear)

# Make predictions
predictions_linear = lin_reg_model_spark.transform(test_data_linear)

# Evaluate the model
evaluator_r2 = RegressionEvaluator(metricName="r2")
r2_spark = evaluator_r2.evaluate(predictions_linear)
evaluator_mse = RegressionEvaluator(metricName="mse")
mse_spark = evaluator_mse.evaluate(predictions_linear)

print("\n--- PySpark Linear Regression Evaluation ---")
print(f"R-squared: {r2_spark:.4f}")
print(f"Mean Squared Error: {mse_spark:.4f}")

# Show a sample of predictions
print("\nPySpark Linear Regression Predictions Sample:")
predictions_linear.select("prediction", "label", "features").show(5)


### 7. Visualization of Results
# Use PCA to reduce dimensions for visualizing the classification results.
# Use the preprocessed scikit-learn test data for PCA
pca = PCA(n_components=2, random_state=RANDOM_STATE)

# Ensure the data is in a dense format for PCA
X_test_prepared_dense = X_test_logistic_prepared.toarray() if hasattr(X_test_logistic_prepared, "toarray") else X_test_logistic_prepared
X_test_pca = pca.fit_transform(X_test_prepared_dense)


df_pca = pd.DataFrame(X_test_pca, columns=['PCA1', 'PCA2'])
df_pca['True Class'] = y_test_logistic.values
df_pca['Logistic Regression Prediction'] = y_pred_logistic_sklearn

fig, axes = plt.subplots(1, 2, figsize=(18, 8), sharex=True, sharey=True)
sns.scatterplot(ax=axes[0], data=df_pca, x='PCA1', y='PCA2', hue='True Class', palette='viridis', s=100, alpha=0.8).set_title('Ground Truth')
sns.scatterplot(ax=axes[1], data=df_pca, x='PCA1', y='PCA2', hue='Logistic Regression Prediction', palette='viridis', s=100, alpha=0.8).set_title('Scikit-learn Logistic Regression Predictions')
plt.suptitle("Classification Visualization using PCA Components", fontsize=16)
plt.show()


### 8. Final Summary
# A summary of the final evaluation scores from both frameworks.
print("\n--- Summary of Classification Evaluation Scores ---")
summary_data_classification = {
    'Framework': ['Scikit-learn', 'PySpark'],
    'Model': ['Logistic Regression', 'Binomial Logistic Regression'],
    'Accuracy': [accuracy_sklearn_logistic, accuracy_spark_logistic]
}
summary_df_classification = pd.DataFrame(summary_data_classification)
display(summary_df_classification)


print("\n--- Summary of Regression Evaluation Scores ---")
summary_data_regression = {
    'Framework': ['Scikit-learn', 'PySpark'],
    'Model': ['Linear Regression', 'Linear Regression'],
    'Mean Squared Error': [mse_sklearn, mse_spark],
    'R-squared': [r2_sklearn, r2_spark]
}
summary_df_regression = pd.DataFrame(summary_data_regression)
display(summary_df_regression)


### 9. Stop SparkSession
# Stop the SparkSession at the end of the analysis.
print("\nStopping SparkSession...")
spark.stop()
print("SparkSession stopped.")