# DL.ipynb
!pip install torchvision
!pip install torchmetrics
# Markdown:
# regression
import torch as th
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import make_regression
from sklearn.metrics import r2_score
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt

# Генерация синтетических данных
X, y, coef = make_regression(
    n_features=4, n_informative=4, coef=True, bias=0.5, random_state=42
)
X = th.FloatTensor(X)
y = th.FloatTensor(y).view(-1, 1)  # Преобразование y в двумерный тензор

# Разделение на обучающую и тестовую выборки
train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# Датасеты и загрузчики
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset))

# Модель регрессии
class RegressionModel(nn.Module):
    def __init__(self, n_inputs: int, n_hidden: int):
        super().__init__()
        self.fc1 = nn.Linear(in_features=n_inputs, out_features=n_hidden)
        self.fc2 = nn.Linear(in_features=n_hidden, out_features=1)
        self.relu = nn.ReLU()

    def forward(self, X: th.Tensor) -> th.Tensor:
        out = self.fc1(X)
        out = self.relu(out)
        out = self.fc2(out)
        return out

# Инициализация модели, функции потерь и оптимизатора
model = RegressionModel(n_inputs=4, n_hidden=10)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Обучение и оценка модели
epochs = 1000
train_losses = []
test_r2_scores = []

for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for X_batch, y_batch in train_loader:
        # Прямой проход
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)

        # Обратное распространение
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    train_losses.append(epoch_loss / len(train_loader))

    # Оценка на тестовом наборе
    model.eval()
    with th.no_grad():
        for X_test_batch, y_test_batch in test_loader:
            y_test_pred = model(X_test_batch)
            r2 = r2_score(y_test_batch.numpy(), y_test_pred.numpy())
            test_r2_scores.append(r2)

    # Печать метрик каждые 100 эпох
    if (epoch + 1) % 100 == 0:
        print(
            f"Эпоха {epoch + 1}/{epochs}, Ошибка на обучении: {train_losses[-1]:.4f}, R2 на тесте: {r2:.4f}"
        )

# Построение графиков

# График 1: Ошибка на обучении
plt.figure(figsize=(8, 5))
plt.plot(range(epochs), train_losses, label="Ошибка (MSE)", color='blue')
plt.xlabel("Эпоха")
plt.ylabel("Ошибка (MSE)")
plt.title("График ошибки на обучении")
plt.grid(True)
plt.legend()
plt.show()

# График 2: Метрика R2 на тестировании
plt.figure(figsize=(8, 5))
plt.plot(range(epochs), test_r2_scores, label="Качество на тестировании (R2)", color='green')
plt.xlabel("Эпоха")
plt.ylabel("Метрика (R2)")
plt.title("График качества на тестировании")
plt.grid(True)
plt.legend()
plt.show()

# Markdown:
# classification
import pandas as pd

import torch as th
import torch.nn as nn
import torch.optim as optim
import torchmetrics as M
from torch.utils.data import TensorDataset, DataLoader, random_split

from sklearn.metrics import classification_report, confusion_matrix

import matplotlib.pyplot as plt
# Markdown:
2. Набор данных: classification/bank.esv. Используя библиотеку PyTorch, решите задачу
классификации (столбец deposit). Разделите набор данных на обучающее и тестовое
множество. Выполните предобработку данных (корректно обработайте случаи
категориальных и нечисловых столбцов, при наличии). Отобразите график значений
функции потерь на обучающем множестве по эпохам. Отобразите confusion matrix и
classification report, рассчитанные на основе тестового множества. Сравните несколько
различных оптимизаторов и графически продемонстрируйте, как выбор оптимизатора
влияет на процесс обучения и результаты на тестовом множестве. (20 баллов)
data = pd.read_csv('bank.csv')
data.dtypes
categorical_cols = ['job', 'marital', 'education', 'default', 'housing',
                    'loan', 'contact', 'month', 'poutcome', 'deposit']

data_processed = pd.get_dummies(data, columns = categorical_cols, drop_first = True)
data_processed.head()
sum(y), len(y)
X = data_processed.drop(columns = ['deposit_yes']).to_numpy().astype(float)
X = th.tensor(X, dtype = th.float32)

y = data_processed['deposit_yes'].to_numpy().astype(float)
y = th.tensor(y, dtype = th.long)

th.manual_seed(42)
train, test = random_split(TensorDataset(X, y), [0.8, 0.2])
loader = DataLoader(train, batch_size = 64)

model = nn.Sequential(
    nn.Linear(in_features = 42, out_features = 64),
    nn.ReLU(),
    nn.Linear(in_features = 64, out_features = 32),
    nn.ReLU(),
    nn.Linear(in_features = 32, out_features = 16),
    nn.ReLU(),
    nn.Linear(in_features = 16, out_features = 2),
)
criterion = nn.CrossEntropyLoss(weight = th.tensor([0.3, 0.7]))
optimizer = optim.AdamW(model.parameters(), lr = 0.01)

epoch_losses = []
for epoch in range(50):
    epoch_loss = 0
    for X_batch, y_batch in loader:
        # forward pass
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)
        epoch_loss += loss
        # backprop
        loss.backward()
        # gradient descend
        optimizer.step()
        optimizer.zero_grad()
    epoch_loss = epoch_loss / len(loader)
    epoch_losses.append(epoch_loss.item())
    if (epoch) % 5 == 0:
        print(epoch, epoch_loss.item())
plt.title('CrossEntropyLoss')
plt.plot(epoch_losses)
plt.grid(True)
plt.show()
y_pred_test = model(test[:][0]).argmax(dim=1)
y_test = test[:][1]
confusion_matrix(y_test, y_pred_test)
print(classification_report(y_test, y_pred_test))
loader = DataLoader(train, batch_size = 64)

model_1 = nn.Sequential(
    nn.Linear(in_features = 42, out_features = 16),
    nn.ReLU(),
    nn.Linear(in_features = 16, out_features = 8),
    nn.ReLU(),
    nn.Linear(in_features = 8, out_features = 2),
)
criterion_1 = nn.CrossEntropyLoss()
optimizer_1 = optim.AdamW(model_1.parameters(), lr = 0.01)

model_2 = nn.Sequential(
    nn.Linear(in_features = 42, out_features = 16),
    nn.ReLU(),
    nn.Linear(in_features = 16, out_features = 8),
    nn.ReLU(),
    nn.Linear(in_features = 8, out_features = 2),
)
criterion_2 = nn.CrossEntropyLoss()
optimizer_2 = optim.SGD(model_2.parameters(), lr = 0.01)

model_3 = nn.Sequential(
    nn.Linear(in_features = 42, out_features = 16),
    nn.ReLU(),
    nn.Linear(in_features = 16, out_features = 8),
    nn.ReLU(),
    nn.Linear(in_features = 8, out_features = 2),
)
criterion_3 = nn.CrossEntropyLoss()
optimizer_3 = optim.RMSprop(model_3.parameters(), lr = 0.01)

epoch_losses_1 = []
epoch_losses_2 = []
epoch_losses_3 = []

ep = 10

for epoch in range(ep):
    epoch_loss = 0
    for X_batch, y_batch in loader:
        y_pred = model_1(X_batch)
        loss = criterion_1(y_pred, y_batch)
        epoch_loss += loss
        loss.backward()
        optimizer_1.step()
        optimizer_1.zero_grad()
    epoch_loss = epoch_loss / len(loader)
    epoch_losses_1.append(epoch_loss.item())

for epoch in range(ep):
    epoch_loss = 0
    for X_batch, y_batch in loader:
        y_pred = model_2(X_batch)
        loss = criterion_2(y_pred, y_batch)
        epoch_loss += loss
        loss.backward()
        optimizer_2.step()
        optimizer_2.zero_grad()
    epoch_loss = epoch_loss / len(loader)
    epoch_losses_2.append(epoch_loss.item())

for epoch in range(ep):
    epoch_loss = 0
    for X_batch, y_batch in loader:
        y_pred = model_3(X_batch)
        loss = criterion_3(y_pred, y_batch)
        epoch_loss += loss
        loss.backward()
        optimizer_3.step()
        optimizer_3.zero_grad()
    epoch_loss = epoch_loss / len(loader)
    epoch_losses_3.append(epoch_loss.item())

plt.title('CrossEntropyLoss')
plt.plot(epoch_losses_1, label = 'AdamW')
plt.plot(epoch_losses_2, label = 'SGD')
plt.plot(epoch_losses_3, label = 'RMSprop')
plt.legend()
plt.grid(True)
plt.show()
# Markdown:
# images
import zipfile
import torch as th
import torchvision
import torchvision.transforms.v2 as T
import torch.nn as nn
import torch.optim as optim
import torchmetrics as M
from torch.utils.data import DataLoader, random_split

import matplotlib.pyplot as plt
# Markdown:
3. Набор данных: images/sign_language.zip. Реализовав сверточную нейронную сеть при
помощи библиотеки PyTorch, решите задачу классификации изображений. Разделите
набор данных на обучающее и тестовое множество. Выполните предобработку данных
(приведите изображения к одному размеру, нормализуйте и преобразуйте изображения в
тензоры). Графически отобразите, как качество на тестовом множестве (micro F1) зависит
от количества сверточных блоков (свертка, активация, пулинг). (20 баллов)
with zipfile.ZipFile('sign_language.zip', 'r') as archive:
    archive.extractall()
transform = T.Compose([
    T.Resize((32, 32)),
    T.ToTensor()
])

dataset = torchvision.datasets.ImageFolder(root = 'sign_language', transform = transform)

dataset_loader = DataLoader(dataset, batch_size = 8)
n = len(dataset) * 32 * 32

mu = th.zeros((3,), dtype=th.float)
sig = th.zeros((3,), dtype=th.float)

for batch, _ in dataset_loader:
    for data in batch:
        mu += data.sum(dim = 1).sum(dim = 1)
        sig += (data**2).sum(dim = 1).sum(dim = 1)

mu = mu / n
sig = (sig - mu**2) / n

mu, sig
transform_new = T.Compose([
    T.Resize((32, 32)),
    T.ToTensor(),
    T.Normalize(mean = mu, std = sig)
])

dataset = torchvision.datasets.ImageFolder(root = 'sign_language', transform = transform_new)

print(f'Картинок в датасете: {len(dataset)}')
print(f'Количество классов: {len(dataset.classes)}')
print(f'Размер картинки: {dataset[0][0].shape}')
train, test = random_split(dataset, [0.8, 0.2])
class CNN_1block(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv_block1 = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=6,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.classifier = nn.Linear(1350, 10)

    def forward(self, X):
        out = self.conv_block1(X)
        out = out.flatten(start_dim=1)
        out = self.classifier(out)
        return out
loader = DataLoader(train, batch_size = 8)
loader_test = DataLoader(test, batch_size = 8)

model = CNN_1block()
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr = 0.01)
epoch_losses = []

f1 = M.F1Score(task = 'multiclass', num_classes = 10)
f1_test = []

for epoch in range(5):
    epoch_loss = 0
    for X_batch, y_batch in loader:
        # forward pass
        y_pred = model(X_batch)
        loss = criterion(y_pred.squeeze(0), th.tensor(y_batch))
        epoch_loss += loss
        # backprop
        loss.backward()
        # gradient descend
        optimizer.step()
        optimizer.zero_grad()
    epoch_loss = epoch_loss / len(train)
    epoch_losses.append(epoch_loss.item())

    test_pred = []
    test_true = []

    for batch in loader_test:
        for i in range(len(batch)):
            X = batch[0][i].unsqueeze(0)
            y = batch[1][i]
            test_true.append(y)
            test_pred.append(model(X).argmax())

    f1_test.append(f1(th.tensor(test_pred), th.tensor(test_true)))
    print(epoch, epoch_loss.item())
plt.title('CrossEntropyLoss')
plt.plot(epoch_losses)
plt.grid(True)
plt.show()
plt.title('F1_Score')
plt.plot(f1_test)
plt.grid(True)
plt.show()
class CNN_2blocks(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv_block1 = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=6,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.conv_block2 = nn.Sequential(
            nn.Conv2d(
                in_channels=6,
                out_channels=12,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.classifier = nn.Linear(432, 10)

    def forward(self, X):
        out = self.conv_block1(X)
        out = self.conv_block2(out)
        out = out.flatten(start_dim=1)
        out = self.classifier(out)
        return out
class CNN_3blocks(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv_block1 = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=6,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.conv_block2 = nn.Sequential(
            nn.Conv2d(
                in_channels=6,
                out_channels=12,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.conv_block3 = nn.Sequential(
            nn.Conv2d(
                in_channels=12,
                out_channels=24,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.classifier = nn.Linear(96, 10)

    def forward(self, X):
        out = self.conv_block1(X)
        out = self.conv_block2(out)
        out = self.conv_block3(out)
        out = out.flatten(start_dim=1)
        out = self.classifier(out)
        return out
loader = DataLoader(train, batch_size = 8)
loader_test = DataLoader(test, batch_size = 8)

model = CNN_2blocks()
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr = 0.01)
epoch_losses = []

f1 = M.F1Score(task = 'multiclass', num_classes = 10)
f1_test_2 = []

for epoch in range(5):
    epoch_loss = 0
    for X_batch, y_batch in loader:
        # forward pass
        y_pred = model(X_batch)
        loss = criterion(y_pred.squeeze(0), th.tensor(y_batch))
        epoch_loss += loss
        # backprop
        loss.backward()
        # gradient descend
        optimizer.step()
        optimizer.zero_grad()
    epoch_loss = epoch_loss / len(train)
    epoch_losses.append(epoch_loss.item())

    test_pred = []
    test_true = []

    for batch in loader_test:
        for i in range(len(batch)):
            X = batch[0][i].unsqueeze(0)
            y = batch[1][i]
            test_true.append(y)
            test_pred.append(model(X).argmax())

    f1_test_2.append(f1(th.tensor(test_pred), th.tensor(test_true)))
    print(epoch, epoch_loss.item())
loader = DataLoader(train, batch_size = 8)
loader_test = DataLoader(test, batch_size = 8)

model = CNN_3blocks()
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr = 0.01)
epoch_losses = []

f1 = M.F1Score(task = 'multiclass', num_classes = 10)
f1_test_3 = []

for epoch in range(5):
    epoch_loss = 0
    for X_batch, y_batch in loader:
        # forward pass
        y_pred = model(X_batch)
        loss = criterion(y_pred.squeeze(0), th.tensor(y_batch))
        epoch_loss += loss
        # backprop
        loss.backward()
        # gradient descend
        optimizer.step()
        optimizer.zero_grad()
    epoch_loss = epoch_loss / len(train)
    epoch_losses.append(epoch_loss.item())

    test_pred = []
    test_true = []

    for batch in loader_test:
        for i in range(len(batch)):
            X = batch[0][i].unsqueeze(0)
            y = batch[1][i]
            test_true.append(y)
            test_pred.append(model(X).argmax())

    f1_test_3.append(f1(th.tensor(test_pred), th.tensor(test_true)))
    print(epoch, epoch_loss.item())
plt.title('F1_Score')
plt.plot(f1_test, label = '1 block')
plt.plot(f1_test_2, label = '2 blocks')
plt.plot(f1_test_3, label = '3 blocks')
plt.legend()
plt.grid(True)
plt.show()

