# t-1 10_multiprocessing_Таратин_Артём.ipynb
# Markdown:
# Параллельные вычисления
# Markdown:
Материалы:
* Макрушин С.В. Лекция 10: Параллельные вычисления
* https://docs.python.org/3/library/multiprocessing.html
# Markdown:
## Задачи для совместного разбора
# Markdown:
1. Посчитайте, сколько раз встречается каждый из символов (заглавные и строчные символы не различаются) в файле `Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt` и в файле `Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt`.
from collections import Counter, defaultdict
import multiprocessing as mp
import time
def count(file):
    with open(file, 'r', encoding='cp1251') as f:
        data = ''.join(f.readlines()).lower()
    counts = Counter(data)
    return counts
start = time.perf_counter()

file1 = './10_multiprocessing_data/Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt'
file2 = './10_multiprocessing_data/Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt'
c1 = count(file1)
c2 = count(file2)

finish = time.perf_counter()
print(f'Выполнено за {round(finish - start, 2)} сек.')
c1.most_common()[:5], c2.most_common()[:5]
c2['к']
# Markdown:
2. Решить задачу 1, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создать свой собственный процесс.
%%file read_book.py

from collections import Counter

def count(file, output):
    with open(file, 'r', encoding='cp1251') as f:
        data = ''.join(f.readlines()).lower()
    counts = Counter(data)
    output.put(counts)
import read_book

start = time.perf_counter()

output = mp.Queue()

processes = [mp.Process(target=read_book.count, args=([file1, file2][i], output)) for i in range(2)]

for p in processes:
    p.start()

for p in processes:
    p.join()

results = [output.get() for p in processes]

finish = time.perf_counter()

print(results[0].most_common(5))
print(f'Выполнено за {round(finish - start, 2)} сек.')
# Markdown:
## Лабораторная работа 10
# Markdown:
1. Разбейте файл `recipes_full.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `id_tag_nsteps_*.csv`. Каждый файл содержит 3 столбца: `id`, `tag` и `n_steps`, разделенных символом `;`. Для разбора строк используйте `csv.reader`.

__Важно__: вы не можете загружать в память весь файл сразу. Посмотреть на первые несколько строк файла вы можете, написав код, который считывает эти строки.

Подсказка: примерное кол-во строк в файле - 2.3 млн.

```
id;tag;n_steps
137739;60-minutes-or-less;11
137739;time-to-make;11
137739;course;11
```
import csv
import os

recipes_full_path = './../../datasets/recipes_full.csv'
!head -n 5 {recipes_full_path}
rows = 0
with open(recipes_full_path, newline='') as csvfile:
    spamreader = csv.reader(csvfile, delimiter=',')
    for row in spamreader:
        if row[1].isnumeric():
            rows += 1
rows
folds = 8
file_idx = 0
count_rows = 0
active_file = None
csv_writer = None
rows_in_file = rows // folds
extra_rows = rows % folds
files = [f'./10_multiprocessing_data/id_tag_nsteps_{i}.csv' for i in range(1, folds+1)]
files
with open(recipes_full_path, 'r', encoding='utf-8', newline='') as file:
    csv_reader = csv.reader(file, delimiter=',')
    headers = next(csv_reader)

    for line in csv_reader:
        if active_file is None or count_rows >= rows_in_file + (1 if file_idx <= extra_rows else 0):
            if active_file:
                active_file.close()

            file_idx += 1
            count_rows = 0
            active_file = open(f'./10_multiprocessing_data/id_tag_nsteps_{file_idx}.csv', 'w', newline='', encoding='utf-8')
            csv_writer = csv.writer(active_file, delimiter=';')
            csv_writer.writerow(['id', 'tags', 'steps'])

        csv_writer.writerow([line[1], line[5], line[6]])
        count_rows += 1

    if active_file:
        active_file.close()
# Markdown:
2. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает среднее значение количества шагов для каждого тэга и возвращает результат в виде словаря.
def find_avg_steps(file_path):
    step_totals = defaultdict(lambda: {'steps': 0, 'count': 0})

    with open(file_path, 'r', encoding='utf-8') as f:
        csv_data = csv.reader(f, delimiter=';')
        headers = next(csv_data)

        for record in csv_data:
            steps = int(record[2])
            tags = record[1].strip('[]').replace("'", "").split(', ')

            for tag in tags:
                step_totals[tag]['steps'] += steps
                step_totals[tag]['count'] += 1

    avg_steps_per_tag = {tag: value['steps'] / value['count'] for tag, value in step_totals.items()}
    return avg_steps_per_tag
avg_steps = find_avg_steps(files[0])
avg_steps['mexican']
# Markdown:
3. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга по всем файлам, полученным в задаче 1, и возвращает результат в виде словаря. Не используйте параллельных вычислений. При реализации выделите функцию, которая объединяет результаты обработки отдельных файлов. Модифицируйте код из задачи 2 таким образом, чтобы иметь возможность получить результат, имея результаты обработки отдельных файлов. Определите, за какое время задача решается для всех файлов.
def process_file(file_path):
    steps_data = defaultdict(lambda: {'steps': 0, 'count': 0})
    with open(file_path, 'r', encoding='utf-8') as f:
        csv_data = csv.DictReader(f, delimiter=';')
        for record in csv_data:
            steps = int(record['steps'])
            tags = record['tags'].strip('[]').replace("'", "").split(', ')
            for tag in tags:
                steps_data[tag]['steps'] += steps
                steps_data[tag]['count'] += 1
    tag_steps = {tag: data['steps'] for tag, data in steps_data.items()}
    tag_count = {tag: data['count'] for tag, data in steps_data.items()}
    return tag_steps, tag_count

def calculate_avg_steps_all_files(file_list):
    total_steps, total_counts = defaultdict(int), defaultdict(int)
    for file_name in file_list:
        file_steps, file_counts = process_file(file_name)
        for tag, steps in file_steps.items():
            total_steps[tag] += steps
            total_counts[tag] += file_counts[tag]
    avg_steps = {tag: total_steps[tag] / total_counts[tag] for tag in total_steps}
    return avg_steps
t = %timeit -o -q result = calculate_avg_steps_all_files(files)
print(f'{t.average:.4f} сек в среднем')
# Markdown:
4. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создайте свой собственный процесс. Определите, за какое время задача решается для всех файлов.
%%file process_file.py

import csv
from collections import defaultdict

def process_file(file_path):
    steps_data = defaultdict(lambda: {'steps': 0, 'count': 0})

    with open(file_path, 'r', encoding='utf-8') as f:
        csv_data = csv.DictReader(f, delimiter=';')

        for record in csv_data:
            steps = int(record['steps'])
            tags = record['tags'].strip('[]').replace("'", "").split(', ')
            for tag in tags:
                steps_data[tag]['steps'] += steps
                steps_data[tag]['count'] += 1

    tag_steps = {tag: data['steps'] for tag, data in steps_data.items()}
    tag_count = {tag: data['count'] for tag, data in steps_data.items()}

    return tag_steps, tag_count
import process_file

def calculate_avg_steps_all_files_mp(file_list):
    with mp.Pool() as pool:
        results = pool.map(process_file.process_file, file_list)

    total_steps, total_counts = defaultdict(int), defaultdict(int)
    for file_steps, file_counts in results:
        for tag, steps in file_steps.items():
            total_steps[tag] += steps
            total_counts[tag] += file_counts[tag]

    avg_steps = {tag: total_steps[tag] / total_counts[tag] for tag in total_steps}
    return avg_steps
t = %timeit -o -q result = calculate_avg_steps_all_files_mp(files)
print(f'{t.average:.4f} сек в среднем')
# Markdown:
5. (*) Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Создайте фиксированное количество процессов (равное половине количества ядер на компьютере). При помощи очереди передайте названия файлов для обработки процессам и при помощи другой очереди заберите от них ответы.
...

# t-2 11_dask_array_Таратин_Артём.ipynb
# Markdown:
# Dask Array
# Markdown:
Материалы:
* Макрушин С.В. Лекция 11: Dask
* https://docs.dask.org/en/latest/array.html
* JESSE C. DANIEL. Data Science with Python and Dask. 
# Markdown:
## Задачи для совместного разбора
import matplotlib.pyplot as plt
import dask.array as da
import h5py
import numpy as np
# Markdown:
1. Создайте массив размерностью 1000 на 300000, заполненный числами из стандартного нормального распределения. Исследуйте основные характеристики полученного массива.
da_array = da.random.normal(0, 1, size=(1000, 300_000), chunks=(1000 // 10, 300_000 // 7))
print(
    f'Среднее: {da_array.mean().compute():.4f}',
    f'Стандартное отклонение: {da_array.std().compute():.4f}',
    f'Минимум: {da_array.min().compute():.4f}',
    f'Максимум: {da_array.max().compute():.4f}',
    sep='\n',
)
# Markdown:
2. Посчитайте сумму квадратов элементов массива, созданного в задаче 1. Создайте массив `np.array` такого же размера и сравните скорость решения задачи с использование `da.array` и `np.array`
np_array = np.random.randn(1000, 300_000)
np_array.shape
print('NumPy:')
%timeit -r1 -n1 np_result = np.sum(np.square(np_array))

print('Dask:')
%timeit -r1 -n1 da_result = da.sum(da.square(da_array)).compute()
# Markdown:
3. Визуализируйте граф вычислений для задачи 12.
dask_graph = da_array**2
dask_graph.visualize()
# Markdown:
## Лабораторная работа 11
# Markdown:
1. Считайте датасет `recipe` из файла `minutes_n_ingredients_full.hdf5` в виде `dask.array`. Укажите аргумент `chunks=(100_000, 3)` при создании массива. Выведите на экран основную информацию о массиве.
data_f  = h5py.File('./11_dask_array_data/minutes_n_ingredients_full.hdf5', 'r')
list(data_f.keys())
data_set = data_f['/recipe']
data_set.shape
x_da = da.from_array(data_set, chunks=(100_000, 3))
x_da
print('Тип данных:', x_da.dtype)
print('Размер чанка:', x_da.chunksize)
print('Размер в МБ:', x_da.nbytes // 2**20)
# Markdown:
2. Вычислите среднее значение по каждому столбцу, кроме первого.
proc = x_da[:, 1:].mean(axis=0)
proc.visualize()
mean_values = proc.compute()
mean_values
# Markdown:
3. Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего.
chunk_sizes = np.array([(i, j) for i in np.arange(100000, 2231637+100000, 100000) for j in np.arange(1, 4)])
chunk_sizes[:5]
time_compute = []
for chunk_size in chunk_sizes:
    x_da_chunk_size = da.from_array(data_set, chunks=chunk_size)
    t = %timeit -o -q mean_values = x_da_chunk_size[:, 1:].mean(axis=0).compute()
    time_compute.append(t.average)
    print(chunk_size, t)
best_size_ix = np.argmin(time_compute)
print(f'Наиболее эффективный размер состовляет {chunk_sizes[best_size_ix]}')
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
plt.title('Время расчёта в зависимости от размера чанка')

ax.scatter(chunk_sizes[:, 0]//1000, chunk_sizes[:, 1], time_compute, color='blue')
ax.scatter(chunk_sizes[best_size_ix, 0]//1000, chunk_sizes[best_size_ix, 1], time_compute[best_size_ix], color='red')

ax.set_xlabel('Размер чанка [0] в тыс.')
ax.set_ylabel('Размер чанка [1]')
ax.set_zlabel('Время ms')
ax.set_yticks([1, 2, 3])
ax.grid(True)
plt.tight_layout()
plt.show()
# Markdown:
4. Выберите рецепты, время выполнения которых меньше медианного значения
med = da.percentile(x_da[:, 1], 50)
below_med = x_da[x_da[:, 1] < med]
below_med.compute()
# Markdown:
5. Посчитайте количество каждого из возможных значений кол-ва ингредиентов
counts = da.bincount(x_da[:, 2]).compute()
value_counts = {i: count.item() for i, count in enumerate(counts) if i != 0}
value_counts
# Markdown:
6. Найдите максимальную продолжительность рецепта. Ограничьте максимальную продолжительность рецептов сверху значением, равному 75% квантилю.
x_da[:, 1].max().compute().item()
percentile_75 = da.percentile(x_da[:, 1], 75)
x_da_limited = da.where(x_da[:, 1] > percentile_75, percentile_75, x_da[:, 1])
x_da_limited.max().compute().item()
percentile_75.compute().item()
# Markdown:
7. Создайте массив `dask.array` из 2 чисел, содержащих ваши предпочтения относительно времени выполнения рецепта и кол-ва ингредиентов. Найдите наиболее похожий (в смысле $L_1$) рецепт из имеющихся в датасете.
preferences = da.array([30, 5]).compute()
preferences
distances = da.sum(da.abs(x_da[:, 1:3] - preferences), axis=1)
ix = da.argmin(distances).compute()
recipe_preferences = x_da[ix].compute()
recipe_preferences, ix
# Markdown:
8. Работая с исходным файлом в формате `hdf5`, реализуйте алгоритм подсчета среднего значения в блочной форме и вычислите с его помощью среднее значение второго столбца в массиве.

    Блочный алгоритм вычислений состоит из двух частей:
    1. Загрузка фрагмента за фрагментом данных по `blocksize` элементов и проведение вычислений на этим фрагментом
    2. Агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных

    Важно: при работе с `h5py` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент
def block_mean(file_path, dataset_name, column_index, blocksize):
    with h5py.File(file_path, 'r') as f:
        dataset = f[dataset_name]
        total_size = dataset.shape[0]
        
        total_sum = 0
        total_count = 0
        
        for start in range(0, total_size, blocksize):
            end = min(start + blocksize, total_size)
            block = dataset[start:end, column_index]
            
            total_sum += np.sum(block)
            total_count += len(block)
        
        average_mean = total_sum / total_count
        return average_mean
file_path = './11_dask_array_data/minutes_n_ingredients_full.hdf5'
dataset_name = '/recipe'
column_index = 1
blocksize = 100_000
block_res = block_mean(file_path, dataset_name, column_index, blocksize)
defaut_res = x_da[:, column_index].mean().compute()
print(f'{block_res  = :.4f}\n{defaut_res = :.4f}')


# t-3 12_sqlite_Таратин_Артём.ipynb
# Markdown:
# SQLite
# Markdown:
Материалы:
* [Основы работы с SQLite](https://proglib.io/p/samouchitel-po-python-dlya-nachinayushchih-chast-22-osnovy-raboty-s-sqlite-2023-06-15)
* [DB Browser for SQLite](https://cknoll.github.io/sqlitebrowser/)
* [13 Free Open-source SQLite Database clients and managers](https://medevel.com/13-sqlite-database-clients-managers/)
* [SQLiteStudio - Свободный менеджер SQLite баз данных](https://континентсвободы.рф/sqlitestudio-svobodnyj-menedzher-sqlite-baz-dannyh/)
* [Пример 1](https://colab.research.google.com/drive/1x1jbnbzTFwD3CcmgDHSKH7hVpCN6xstd)
* [Пример 2](https://colab.research.google.com/drive/1VgLgA9ZuTu4AIdDf1n9Mj-biPb16PcGV)
# Markdown:
## Лабораторная работа 12
# Markdown:
Исходные данные согласно варианту:
1) Excel-sample-data-for-pivot-tables.xlsx
1) excel-spreadsheet-examples-for-students.xlsx
1) sample-csv-file-for-testing.csv
1) **Sample-excel-file-with-employee-data.xlsx**
1) Sample-sales-data-excel.xlsx
1) sample-xlsx-file-for-testing.xlsx
import pandas as pd
import sqlite3
# Чтение данных из Excel файла
df = pd.read_excel('./12_sqlite_data/Sample-excel-file-with-employee-data.xlsx')
# Вывод первых 3 строк
df.head(3)
# Создание соединения с БД SQLite
conn = sqlite3.connect('./12_sqlite_data/data.db')
cur = conn.cursor()

# Удаление таблиц
cur.execute('DROP TABLE IF EXISTS employee')
cur.execute('DROP TABLE IF EXISTS job_categories')

# Создание таблицы
cur.execute('''
CREATE TABLE IF NOT EXISTS employee (
    id INTEGER PRIMARY KEY,
    Jobcat INTEGER,
    Salary INTEGER,
    Prev_Exp INTEGER
)
''')
conn.commit()

# Вывод всех существующих таблиц в базе данных
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
cur.fetchall()
# Markdown:
Создать в БД SQLite таблицу с несколькими колонками (3-5 колонок) из XLSX-таблицы, заполнить их соответствующими данными:
# Markdown:
* задавая в программы несколько INSERT (не менее 10) с конкретными значениями
# Разделение строки на отдельные запросы
insert_query = '''
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (72, 1, 54000, 11);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (312, 1, 25650, 64);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (206, 2, 33750, 284);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (142, 1, 28500, 34);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (187, 3, 58750, 13);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (12, 1, 28350, 26);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (341, 3, 59400, 272);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (134, 3, 41550, 285);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (460, 1, 22500, 24);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (280, 1, 24750, 5);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (451, 1, 28500, 20);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (205, 3, 66750, 258);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (85, 1, 25950, 42);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (5, 1, 45000, 138);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (55, 1, 27000, 120);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (203, 1, 29100, 50);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (311, 1, 22500, 63);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (453, 1, 24450, 338);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (350, 1, 37650, 132);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (240, 3, 54375, 81);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (300, 1, 26550, 105);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (232, 3, 55500, 62);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (48, 2, 30750, 240);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (245, 1, 26700, 18);
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (50, 3, 60000, 59);
'''.split('\n')

# Выполнение каждого запроса по отдельности
for query in insert_query:
    cur.execute(query)
# Сохранение изменений
conn.commit()

cur.execute('SELECT * FROM employee')
cur.fetchall()[:5]
# Удаление и создание таблицы
cur.execute('DROP TABLE IF EXISTS employee')
cur.execute('''
CREATE TABLE IF NOT EXISTS employee (
    id INTEGER,
    Jobcat INTEGER,
    Salary INTEGER,
    Prev_Exp INTEGER
)
''')
conn.commit()
# Markdown:
* загружая в цикле непосредственно из XLSX-таблицы (+3 балла)
# Вставка данных
insert_query = '''
INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (?, ?, ?, ?)
'''
data_to_insert = df[['ID', 'Jobcat', 'Salary', 'Prev Exp']].values.tolist()
cur.executemany(insert_query, data_to_insert)

# Сохранение изменений и закрытие соединения
conn.commit()
# Markdown:
Построить запросы SELECT:
# Markdown:
1) Для выборки записей по некоторому критерию одного (или 2-х) значений SELECT a, b FROM t1 WHERE c>100 AND d=4
query = '''
SELECT id, Salary 
FROM employee 
WHERE Salary > 50000 AND Jobcat = 1
'''
cur.execute(query)
cur.fetchall()
# Markdown:
2) Сгруппировать данные по одному из параметров, подсчитав сумму значений по другому параметру SELECT a, SUM(b) FROM t1 GROUP BY c
query_grouped = '''
SELECT Jobcat, SUM(Salary) 
FROM employee 
GROUP BY Jobcat
'''
cur.execute(query_grouped)
cur.fetchall()
# Markdown:
3) Нормализовать БД и построить запрос по нескольким таблицам
# Удаление старой таблицы, если существует
cur.execute('DROP TABLE IF EXISTS employee')
cur.execute('DROP TABLE IF EXISTS job_categories')

# Создание таблицы категорий работы
cur.execute('''
CREATE TABLE IF NOT EXISTS job_categories (
    Jobcat INTEGER PRIMARY KEY,
    CategoryName TEXT
)
''')

# Создание таблицы сотрудников
cur.execute('''
CREATE TABLE IF NOT EXISTS employee (
    id INTEGER PRIMARY KEY,
    Jobcat INTEGER,
    Salary INTEGER,
    Prev_Exp INTEGER,
    FOREIGN KEY (Jobcat) REFERENCES job_categories (Jobcat)
)
''')

# Сохранение изменений
conn.commit()
# Вставка данных в таблицу job_categories
job_categories = [
    (1, 'Engineer'),
    (2, 'Manager'),
    (3, 'Technician'),
]

cur.executemany(
    'INSERT INTO job_categories (Jobcat, CategoryName) VALUES (?, ?)',
    job_categories
)

# Вставка данных в таблицу employee
employee = [
    (72, 1, 54000, 11),
    (312, 1, 25650, 64),
    (206, 2, 33750, 284),
    (142, 1, 28500, 34),
    (187, 3, 58750, 13),
    (12, 1, 28350, 26),
    (341, 3, 59400, 272),
    (134, 3, 41550, 285),
    (460, 1, 22500, 24),
    (280, 1, 24750, 5),
]

cur.executemany(
    'INSERT INTO employee (id, Jobcat, Salary, Prev_Exp) VALUES (?, ?, ?, ?)', 
    employee
)

# Сохранение изменений
conn.commit()
query_join = '''
SELECT e.id, e.Salary, jc.CategoryName
FROM employee e
JOIN job_categories jc ON e.Jobcat = jc.Jobcat
'''
cur.execute(query_join)
cur.fetchall()
conn.close()


# t-4 13_dask_dataframe_new.ipynb
# Markdown:
# Dask Data Frame
# Markdown:
Dask Data Frames — это аналог Pandas DataFrames, который позволяет работать с таблицами данных, которые не помещаются в памяти. Dask DataFrames поддерживает множество операций, таких как фильтрация, сортировка, группировка и агрегация данных, а также объединение таблиц.
# Markdown:
### Постановка задачи

Цель задания – Обработать два CSV-файла, объединить их на основе ключевого поля K1, и выполнить дальнейшую обработку данных с использованием библиотеки Dask
# Markdown:
### Тематика датасета: Управление проектами

### Описание колонок:

Уникальный идентификатор проекта – K1 (Project_ID)

#### Файл 1:
* Project_Name: Название проекта
* Start_Date: Дата начала проекта
* Budget: Бюджет проекта в долларах

#### Файл 2:
* Project_Manager: Имя менеджера проекта
* End_Date: Дата завершения проекта
* Status: Статус проекта (например, 'В процессе', 'Завершен')
# Markdown:
1\) Считая, что поле K1 имеет одинаковый набор значений (возможно в разном порядке), объединить данные в один DaskDataFrames, загрузив предварительно каждый csv в свой DataFrame c помощью dd.read_csv (+7)
import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
# Чтение данных
df1 = dd.read_csv('./13_dask_dataframe_data/projects_part1.csv')
df2 = dd.read_csv('./13_dask_dataframe_data/projects_part2.csv')
# Объединение DataFrames по ключевому полю 'Project_ID'
merged_df = dd.merge(df1, df2, on='Project_ID', how='inner').compute()

# Вывод результата
merged_df
# Markdown:
2\) наборы значений в файлах не совпадают. При объединении в строках, для которых полу K1 имеется только в одном файле, аттрибуты сотавлять пустыми (или значениями по умолчанию) (+3)
# Объединение DataFrames по ключевому полю 'Project_ID' с типом соединения 'outer'
merged_df = dd.merge(df1, df2, on='Project_ID', how='outer').compute()

# Вывод результата
merged_df
# Markdown:
3\) Обработать итоговый DataFrame, используя не менее 2-х разных функций, выбранных из 4 (5) по варианту (цель обработки выбрать самостоятельно). Список ф-й в док-те Dask_DataFrame_variant (+5)
# Объединение DataFrames по ключевому полю 'Project_ID' с типом соединения 'outer'
merged_df = dd.merge(df1, df2, on='Project_ID', how='outer')
# Обработка итогового DataFrame

# Вычисление общего бюджета (заполняем отсутствующие значения Budget нулями)
total_budget = merged_df['Budget'].fillna(0).sum().compute()
print(f"Общий бюджет всех проектов: {total_budget:.0f}")
# Группировка проектов по статусу и вычисление среднего бюджета
# Заполняем отсутствующие значения в 'Status' как 'Неизвестен' и 'Budget' как 0
merged_df['Status'] = merged_df['Status'].fillna('Неизвестен')
merged_df['Budget'] = merged_df['Budget'].fillna(0)

average_budget_by_status = merged_df.groupby('Status')['Budget'].mean().compute()
print("Средний бюджет по статусу проектов:")
print(average_budget_by_status)
# Markdown:
4\) Оценить производительность работы при разных разбиениях DataFrame (+5)
# Параметры генерации данных
num_rows = 100000
project_ids = [f"P{i:0>5}" for i in range(1, num_rows + 1)]

# Генерация данных для projects_part1_large.csv
df_part1 = pd.DataFrame({
    'Project_ID': project_ids,
    'Project_Name': np.random.choice(['Website Redesign', 'Mobile App Development', 'Marketing Campaign'], num_rows),
    'Start_Date': pd.to_datetime(np.random.randint(1609459200, 1704067200, num_rows), unit='s'),
    'Budget': np.random.uniform(5000, 50000, num_rows).round(2)
})

# Генерация данных для projects_part2_large.csv
additional_project_ids = [f"P{i:0>5}" for i in range(num_rows//2, num_rows + 1)]  # Половина совпадений
df_part2 = pd.DataFrame({
    'Project_ID': additional_project_ids,
    'Project_Manager': np.random.choice(['Иван Иванов', 'Мария Петрова', 'Сергей Смирнов'], len(additional_project_ids)),
    'End_Date': pd.to_datetime(np.random.randint(1612137600, 1704067200, len(additional_project_ids)), unit='s'),
    'Status': np.random.choice(['В процессе', 'Завершен', 'Запланирован', 'Не начат'], len(additional_project_ids))
})

df_part1.to_csv('./13_dask_dataframe_data/projects_part1_large.csv', index=False)
df_part2.to_csv('./13_dask_dataframe_data/projects_part2_large.csv', index=False)
# Функция для оценки времени выполнения
def evaluate_performance(partition_size):
    # Загрузка CSV файлов с указанным количеством разбиений
    df1 = dd.read_csv('./13_dask_dataframe_data/projects_part1_large.csv', blocksize=partition_size)
    df2 = dd.read_csv('./13_dask_dataframe_data/projects_part2_large.csv', blocksize=partition_size)

    # Объединение DataFrames по ключевому полю 'Project_ID' с типом соединения 'outer'
    merged_df = dd.merge(df1, df2, on='Project_ID', how='outer')

    # Обработка итогового DataFrame
    # Вычисление общего бюджета
    total_budget = merged_df['Budget'].fillna(0).sum().compute()

    # Группировка проектов по статусу и вычисление среднего бюджета
    merged_df['Status'] = merged_df['Status'].fillna('Неизвестен')
    merged_df['Budget'] = merged_df['Budget'].fillna(0)
    average_budget_by_status = merged_df.groupby('Status')['Budget'].mean().compute()

    return total_budget, average_budget_by_status
# Оценка при разных размерах блоков
partition_sizes = ['0.5MB', '0.64MB', '1.28MB', '2.56MB', '3MB', '5MB', '7.5MB', '10MB']
for size in partition_sizes:
    print(f"Разбиение: {size}")
    %timeit -n 1 -r 10 evaluate_performance(size)
    print('')


# t-5 13_dask_bag_Таратин_Артём_new.ipynb
# Markdown:
# Обработке данных в стиле Map-Reduce
# Markdown:
1) преобразовать в Dask.Bag     DataFrame из t-4
import dask.dataframe as dd
# Чтение данных
df1 = dd.read_csv('./13_dask_dataframe_data/projects_part1.csv')
df2 = dd.read_csv('./13_dask_dataframe_data/projects_part2.csv')
# Преобразование в Dask.Bag
bag1 = df1.to_bag()
bag2 = df2.to_bag()
# Проверка типа
type(bag1), type(bag2)
# Markdown:
2) Выполнить операции map и filter (действия по выбору)
# Вывод первых 3 строк
df1.head(3)
# Извлечение имени проекта
def extract_project_name(record):
    return record[1]
# Фильтрация по бюджету
def filter_by_budget(record, min_budget=10):
    return record[3] >= min_budget
# Применение map и filter к bag1
filtered_bag1 = bag1.filter(filter_by_budget)
mapped_bag1 = filtered_bag1.map(extract_project_name)
# Выполнение вычислений
mapped_bag1.compute()


# t-6 13_mean_Таратин_Артём_new.ipynb
# Markdown:
# SMA, CMA, EMA – Вариант 2
# Markdown:
Цель работы:
* Провести вычисление различных типов скользящих средних (SMA, CMA, EMA) на собственных данных и сравнить их результаты. Исследовать скорость вычислений при помощи Dask DataFrame и Pandas, провести сравнение времени выполнения операций. Построить графики для визуализации вычисленных значений на одних осях для наглядного сравнения подходов.

Теория:
* Скользящая средняя (SMA) - это один из методов сглаживания временного ряда, который позволяет лучше увидеть тренды данных за определенный период. Каждое значение вычисляется как среднее арифметическое предыдущих значений на интервале окна.

* Кумулятивная скользящая средняя (CMA) - это вариант скользящей средней, где среднее значение пересчитывается с каждым новым элементом, постепенно увеличивая интервал окна до всей выборки.

* Экспоненциальная скользящая средняя (EMA) - это вид скользящей средней, где больший вес придается более недавним данным. Это делает её более чувствительной к последним изменениям во временном ряду.
# Markdown:
Выполнить примеры из [[6]](https://www.geeksforgeeks.org/how-to-calculate-moving-average-in-a-pandas-dataframe/), взяв для обработки свои данные
# Импорт библиотек
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dask.dataframe as dd
plt.style.use('default')
%matplotlib inline
# Читаем данные о NVIDIA Corporation
data = pd.read_csv('./13_dask_dataframe_data/NVDA_240701_241007.csv', delimiter=';') \
    .drop(columns=['<TICKER>', '<PER>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>']) \
    .rename(columns={'<DATE>': 'date', '<CLOSE>': 'close'})
data.head(3)
# Выводим размерность df
data.shape
# Markdown:
1. Рассчитать простую скользящую среднюю (SMA)
# Вычисление простой скользящей средней (SMA) для столбца 'close' с окном 7
data['SMA'] = data['close'].rolling(5, center=True).mean()
data
# Markdown:
2. Вариат 2 - экспоненциальную скользящую среднюю (EMA)
data['EMA'] = data['close'].ewm(com=0.4).mean()
data
# Markdown:
3. Вычисление скользящего среднего временного ряда с помощью Dask DataFrame
# Читаем данные о NVIDIA Corporation
df = dd.read_csv('./13_dask_dataframe_data/NVDA_240701_241007.csv', delimiter=';') \
    .drop(columns=['<TICKER>', '<PER>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>']) \
    .rename(columns={'<DATE>': 'date', '<CLOSE>': 'close'})
df.head(3)
# Подсчет SMA для столбца 'close' с окном 5
df['SMA'] = df['close'].rolling(window = 5).mean()
df.head()
# Markdown:
4. Сравнить время выполнения в 1) и 3)
# Выходит что Dask работает медленнее, чем Pandas
print('Pandas:')
%timeit data['SMA'] = data['close'].rolling(5).mean()
print('Dask:')
%timeit df['SMA'] = df['close'].rolling(5).mean()
# Markdown:
5. На одних осях построить графики данных из 1) и 2)
plt.figure(figsize=(12, 6))
plt.title('График скользящей средней NVIDIA Corporation', fontsize=16)

plt.plot(data['date'], data['close'], label='Close')
plt.plot(data['date'], data['SMA'], label='SMA')
plt.plot(data['date'], data['EMA'], label='EMA')

plt.xlabel('Дата', fontsize=14)
plt.ylabel('Значение', fontsize=14)

plt.xticks(rotation=60)
plt.grid(True)
plt.legend()

plt.tight_layout()
plt.show()


# t-7 13_dask_dataframe_practice_Таратин_Артём_new.ipynb
import pandas as pd
import dask.dataframe as dd
# Markdown:
1\. Первый набор данных (df_pandas1):
* id: Уникальный идентификатор каждой строки. Содержит значения от 1 до 5.
* name: Имя каждого человека. Столбец содержит строковые значения: 'Alice', 'Bob', 'Charlie', 'David', 'Eva'.
* value1: Числовой столбец, содержащий значения от 10 до 50 с шагом 10.

2\. Второй набор данных (df_pandas2):
* id: Уникальный идентификатор для каждой строки. Значения варьируются от 3 до 7.
* category: Категория, к которой относится каждый элемент. Столбец содержит строковые значения: 'X', 'Y', 'Z', 'W', 'V'.
* value2: Числовой столбец, содержащий значения от 300 до 700 с шагом 100.
# Чтение данных
# ertyteruy
df_pandas1 = pd.read_csv('./../../datasets/df_pandas1.csv')
df_pandas2 = pd.read_csv('./../../datasets/df_pandas2.csv')

# Преобразование в Dask DataFrame
df_dask1 = dd.from_pandas(df_pandas1, npartitions=2).set_index('id')
df_dask2 = dd.from_pandas(df_pandas2, npartitions=2).set_index('id')
# Markdown:
#### 25. DataFrame.divisions<br>Tuple of npartitions + 1 values, in ascending order, marking the lower/upper bounds of each partition's index.
# Markdown:
Описание: divisions — это кортеж из npartitions + 1 значений, отсортированных по возрастанию, обозначающих границы индексов каждой партиции.
# Выводим divisions для df_dask1
print("Divisions df_dask1:", df_dask1.divisions)
# Markdown:
#### 50. DataFrame.join(other[, on, how, lsuffix, ...])<br>Join columns of another DataFrame.
# Markdown:
Описание: Метод join объединяет столбцы из другого DataFrame по индексам или по заданному столбцу.
# Выполним join двух Dask DataFrame по индексу 'id'
joined_df = df_dask1.join(df_dask2, how='inner')
print("Joined DataFrame:")
print(joined_df.compute())
# Markdown:
#### 55. DataFrame.map_partitions(func, *args[, ...])<br>Apply a Python function to each partition
# Markdown:
Описание: Метод map_partitions применяет пользовательскую функцию к каждой партиции DataFrame.
# Определим функцию для увеличения значений в 'value1' на 10
def increase_value(df):
    df['value1'] = df['value1'] + 10
    return df

# Применим функцию к каждой партиции
modified_df = df_dask1.map_partitions(increase_value)
print("Modified DataFrame after map_partitions:")
print(modified_df.compute())
# Markdown:
#### 85. DataFrame.rename([index, columns])<br>Rename columns or index labels.
# Markdown:
Описание: Метод rename позволяет переименовать столбцы или индексы DataFrame.
# Переименуем столбцы 'value1' и 'name'
renamed_df = df_dask1.rename(columns={'value1': 'score', 'name': 'fullname'})
print("Renamed DataFrame:")
print(renamed_df.compute())
# Markdown:
#### 113. DataFrame.to_dask_array([lengths, meta, ...])<br>Convert a dask DataFrame to a dask array.
# Markdown:
Описание: Метод to_dask_array преобразует Dask DataFrame в Dask Array, что полезно для численных вычислений.
# Преобразуем df_dask1 в Dask Array
dask_array = df_dask1.to_dask_array(lengths=True)
print("Dask Array:")
print(dask_array.compute())


# t-9 Таратин Аль-Натор.ipynb
# Markdown:
# Импорт библиотек и настройка окружения
Импорт необходимых Python-библиотек для анализа данных и машинного обучения, настройка Dask для распределенных вычислений
# Импорт необходимых библиотек
import pandas as pd
import dask.dataframe as dd
import seaborn as sns
import matplotlib.pyplot as plt
from dask.distributed import Client
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import Ridge
from sklearn.metrics import classification_report, roc_auc_score

# Запуск локального кластера Dask
client = Client()
# Markdown:
# Загрузка и подготовка данных о компаниях
Загрузка данных из Excel-файла с помощью pandas и преобразование в Dask DataFrame для эффективной обработки
# Загрузка данных из Excel-файла с помощью pandas и преобразование в Dask DataFrame для эффективной обработки

# Чтение данных из Excel с помощью pandas и преобразование в Dask DataFrame
companies_data_pd = pd.read_excel('Mendeley_data.xlsx')
companies_data = dd.from_pandas(companies_data_pd, npartitions=4)

# Просмотр первых строк
companies_data.head()
# Markdown:
# Предобработка названий компаний
Стандартизация названий компаний с использованием словаря соответствий
# Предобработка названий компаний

# Словарь для переименования компаний
companies_names = {
    "АВТОВАЗ, АО": "АВТОВАЗ",
    "АЛРОСА (ПАО), АК": "АЛРОСА",
    "АЭРОФЛОТ, ПАО": "АЭРОФЛОТ",
    "БАШНЕФТЬ, ПАО АНК": "БАШНЕФТЬ",
    "ВЕРТОЛЕТЫ РОССИИ, АО": "ВЕРТОЛЕТЫ",
    "ГАЗПРОМ, ПАО": "ГАЗПРОМ",
    "ГАЗПРОМ НЕФТЬ, ПАО": "ГАЗПРОМНЕФТЬ",
    "ГМК НОРИЛЬСКИЙ НИКЕЛЬ, ПАО": "НОРНИКЕЛЬ",
    "ЛУКОЙЛ, ПАО": "ЛУКОЙЛ",
    "ММК, ПАО": "ММК",
    "МТС, ПАО": "МТС",
    "НОВАТЭК, ПАО": "НОВАТЭК",
    "РОСНЕФТЬ, ПАО": "РОСНЕФТЬ",
    "РОСТЕЛЕКОМ, ПАО": "РОСТЕЛЕКОМ",
    "РУСАЛ, АО": "РУСАЛ",
    "РУСГИДРО, ПАО": "РУСГИДРО",
    "СИБУР ХОЛДИНГ, ПАО": "СИБУР",
    "СЕВЕРСТАЛЬ, ПАО": "СЕВЕРСТАЛЬ",
    "СУРГУТНЕФТЕГАЗ, ПАО": "СУРГУТНЕФТЕГАЗ",
    "ТАТНЕФТЬ ИМ. В.Д. ШАШИНА, ПАО": "ТАТНЕФТЬ",
    "ТРАНСНЕФТЬ, ПАО": "ТРАНСНЕФТЬ",
    "ФОСАГРО, ПАО": "ФОСАГРО",
    "ЮНИПРО, ПАО": "ЮНИПРО"
}

# Применение словаря для переименования компаний
companies_data['CompanyName'] = companies_data['Name1'].replace(companies_names)

# Просмотр первых строк после переименования
companies_data.head()
# Markdown:
# Расчет показателей кредитоспособности
Вычисление показателя Score и преобразование его в бинарную переменную Creditworthiness
# Удаление строк с пропусками
companies_data = companies_data.dropna()

# Расчёт показателя "Score" для кредитоспособности
companies_data['Score'] = (companies_data['CFO'] / companies_data['LongDebt']) + (companies_data['Equity'] / companies_data['TotalAssets'])

# Преобразование показателя "Score" в бинарную переменную "Creditworthiness"
companies_data['Creditworthiness'] = (companies_data['Score'] > 1).astype(int)

# Подсчёт распределения кредитоспособности
credit_counts = companies_data['Creditworthiness'].value_counts().compute()
print(credit_counts)
# Markdown:
# Загрузка и фильтрация новостных данных
Загрузка новостного датасета и фильтрация по релевантным темам
# Загрузка новостного датасета и фильтрация по релевантным темам

# Чтение данных с новостями без парсинга даты
try:
    news_data = dd.read_csv(
        'lenta-ru-news.csv',
        parse_dates=False,          # Отключение автоматического парсинга дат
        on_bad_lines='skip',        # Пропуск некорректных строк
        engine='python',            # Использование Python-движка парсинга
        encoding='utf-8',           # Кодировка UTF-8
        assume_missing=True         # Предположить, что отсутствующие значения возможны
    )
except Exception as e:
    print(f"Ошибка при чтении CSV: {e}")

# Просмотр первых строк
news_data.head()
# Преобразование столбца 'date' в datetime с обработкой ошибок
news_data['date'] = dd.to_datetime(news_data['date'], errors='coerce')

# Фильтрация тем
news_data = news_data[news_data['topic'].isin(['Экономика', 'Бизнес'])]

# Проверка после фильтрации
news_data.head()
# Markdown:
# Сбор новостей по компаниям
Реализация функции для сбора и агрегации новостей по компаниям с группировкой по годам
# Сбор новостей по компаниям

# Функция для сбора новостей по компаниям
def collect_news_by_company(news_ddf, company_names):
    # Преобразуем даты в год
    news_ddf = news_ddf.assign(year=news_ddf['date'].dt.year)

    # Функция для обработки внутри партиций
    def filter_and_group(part, companies):
        result_rows = []
        for company in companies:
            # Фильтрация строк с упоминанием компании
            filtered = part[part['text'].str.contains(company, case=False, na=False, regex=False)]
            if not filtered.empty:
                # Группировка по году и объединение текстов
                grouped = filtered.groupby('year')['text'].apply(lambda x: ' '.join(x)).reset_index()
                for _, row in grouped.iterrows():
                    result_rows.append({'CompanyName': company, 'Year': row['year'], 'news_text': row['text']})
        if result_rows:
            return pd.DataFrame(result_rows)
        else:
            return pd.DataFrame(columns=['CompanyName', 'Year', 'news_text'])

    # Применяем map_partitions с функцией filter_and_group
    results = news_ddf.map_partitions(
        filter_and_group, 
        company_names, 
        meta={'CompanyName': 'object', 'Year': 'int64', 'news_text': 'object'}
    )

    return results

# Получение уникальных названий компаний и приведение типа к 'object'
translated_company_names = companies_data['CompanyName'].unique().compute().astype(str)

# Сбор новостей по компаниям
news_summary = collect_news_by_company(news_data, translated_company_names)

# Просмотр первых строк собранных новостей
news_summary.head()
# Markdown:
# Объединение данных и подготовка к моделированию
Объединение данных о компаниях с новостными данными и подготовка для машинного обучения
# Объединение данных о компаниях с новостными данными и подготовка для машинного обучения

# Приведение типов данных в 'CompanyName' для обеих DataFrame к одному типу
companies_data_for_merge = companies_data[['Year', 'CompanyName', 'Creditworthiness']].dropna()
companies_data_for_merge['Year'] = companies_data_for_merge['Year'].astype(int)
companies_data_for_merge['CompanyName'] = companies_data_for_merge['CompanyName'].astype(str)  # Приведение к 'str'

# Объединение данных
merged_data = dd.merge(
    news_summary,
    companies_data_for_merge,
    on=['Year', 'CompanyName'],
    how='left'
)

# Удаление строк с пропусками после объединения
merged_data = merged_data.dropna()

# Вычисление окончательного датасета
final_data = merged_data.compute()

# Просмотр объединённых данных
final_data.head()
# Markdown:
# Визуализация распределения данных по годам
Создание визуализации распределения записей по годам с использованием seaborn и matplotlib
# Подсчёт количества значений по годам и сортировка
year_counts = final_data['Year'].value_counts().sort_index()

# Визуализация с использованием seaborn и matplotlib
plt.figure(figsize=(10, 6))
sns.barplot(x=year_counts.index, y=year_counts.values)
plt.xlabel('Year')
plt.ylabel('Count')
plt.title('Number of Records per Year')
plt.xticks(rotation=45)
plt.show()
# Markdown:
# Обучение и оценка модели
Обучение модели Ridge на TF-IDF признаках и оценка её эффективности
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report, roc_auc_score
import faiss

# Сортировка данных по дате в порядке возрастания
final_data = final_data.sort_values('Year').reset_index(drop=True)

# Разделение на признаки и целевую переменную
X = final_data['news_text']
y = final_data['Creditworthiness']

# Определим год для разделения (предсказываем 2018 год)
split_year = 2018

# Разделение данных по годам
train_mask = final_data['Year'] < split_year
test_mask = final_data['Year'] >= split_year

X_train = X[train_mask]
y_train = y[train_mask]
X_test = X[test_mask]
y_test = y[test_mask]

# Инициализация TF-IDF Vectorizer
tfidf = TfidfVectorizer(max_features=5000)

# Преобразование текста в числовые признаки
X_train_tfidf = tfidf.fit_transform(X_train).toarray()
X_test_tfidf = tfidf.transform(X_test).toarray()

# Преобразование целевой переменной в numpy массив
y_train = y_train.to_numpy()
y_test = y_test.to_numpy()

# Создание индекса FAISS
dimension = X_train_tfidf.shape[1]
index = faiss.IndexFlatL2(dimension)  # Используем L2 расстояние (евклидово)

# Добавление обучающих данных в индекс
index.add(X_train_tfidf.astype('float32'))

# Функция для поиска ближайших соседей и получения предсказаний
def knn_predict(X_query, index, y_train, k=5):
    distances, indices = index.search(X_query.astype('float32'), k)
    # Получение меток соседей
    neighbor_labels = y_train[indices]
    # Агрегация (например, среднее для задачи регрессии или мода для классификации)
    predictions = np.mean(neighbor_labels, axis=1)
    return predictions

# Предсказание на тестовом наборе
k = 5
y_pred = knn_predict(X_test_tfidf, index, y_train, k=k)

# Превращение предсказаний в классы (0 или 1) с порогом 0.5
y_pred_classes = (y_pred > 0.5).astype(int)

# Оценка модели
print("Classification Report:")
print(classification_report(y_test, y_pred_classes))

# Расчёт ROC-AUC
roc_auc = roc_auc_score(y_test, y_pred)
print(f"ROC-AUC: {roc_auc:.2f}")


# t-11 Таратин Артём ПМ22-1.ipynb
# Markdown:
# Таратин Артём ПМ22-1
# Markdown:
### Задание:
Работа с FAISS
На большом объеме больших векторов оценить время нахождения наименьшего расстояния для k других векторов
* алгоритм простого перебора
* алгоритм с помощью FAISS
# Markdown:
### Вывод:
FAISS значительно превосходит метод полного перебора для поиска ближайших соседей в высокоразмерных пространствах. Хотя метод полного перебора прост в реализации, он становится вычислительно неэффективным по мере увеличения объема данных. FAISS предлагает более эффективное и масштабируемое решение, что делает его подходящим для крупномасштабных приложений.
import numpy as np
import time
import faiss
from scipy.spatial import distance
# Фиксируем генератор случайных чисел для повторяемости результатов
np.random.seed(42)

# Параметры
dimension = 512                # Размерность векторов
num_database_vectors = 2000    # Количество векторов в базе данных
num_query_vectors = 1000       # Количество векторов для поиска
k_neighbors = 5                # Количество ближайших соседей для поиска
# Генерация случайных векторов для базы данных
database_vectors = np.random.random((num_database_vectors, dimension)).astype('float32')

# Генерация случайных векторов для запросов
query_vectors = np.random.random((num_query_vectors, dimension)).astype('float32')

print(f"Размерность базы данных: {database_vectors.shape}")
print(f"Размерность запросов: {query_vectors.shape}")
def brute_force_knn(database, queries, k):
    # Вычисляем евклидово расстояние между запросами и векторами базы
    distances = np.sqrt(((queries[:, np.newaxis, :] - database[np.newaxis, :, :]) ** 2).sum(axis=2))

    # Находим индексы k минимальных расстояний
    nearest_indices = np.argpartition(distances, k, axis=1)[:, :k]

    return nearest_indices
start_time = time.time()
brute_force_results = brute_force_knn(database_vectors, query_vectors, k_neighbors)
brute_force_time = time.time() - start_time

print(f"Время выполнения полного перебора: {brute_force_time:.4f} секунд")
# Добавляем векторы базы в индекс
faiss_index = faiss.IndexFlatL2(dimension)
faiss_index.add(database_vectors)
start_time = time.time()
faiss_distances, faiss_indices = faiss_index.search(query_vectors, k_neighbors)
faiss_time = time.time() - start_time

print(f"Время выполнения поиска с помощью FAISS: {faiss_time:.4f} секунд")
print(
    f"Время выполнения полного перебора: {brute_force_time:.4f} секунд\n" \
    f"Время выполнения поиска с помощью FAISS: {faiss_time:.4f} секунд\n" \
    f"Ускорение с помощью FAISS: в {brute_force_time / faiss_time:.2f} раза"
)


# t-12 Таратин Артём ПМ22-1.ipynb
# Markdown:
# Таратин Артём ПМ22-1
### Задание
**t-12  Распараллеливание выполнения алгоритмов с помощью Dask**  
Разработать пример программы на распараллеливание, усложнив алгоритмы функций. Сравнить время выполнения программы без распараллеливания и с распараллеливанием. Добиться максимального коэффициент сокращения времени выполнения (не менее 10 раз)

### Результат
Параллельное выполнение задачи сильно повышает производительность.

[Вычисление числа Пи методом Монте-Карло](https://habr.com/ru/articles/128454/)
# Импортируем необходимые библиотеки
import dask
from dask import delayed
from dask.distributed import Client
import random
import time
# Инициализируем Dask-клиент для распараллеливания
client = Client(n_workers=32, threads_per_worker=4)
# Последовательная функция для оценки числа π
def monte_carlo_pi(nsamples):
    inside = 0
    for _ in range(nsamples):
        time.sleep(0.005)
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        if x * x + y * y <= 1:
            inside += 1
    return (inside / nsamples) * 4

# Параллельная версия функции
@delayed
def monte_carlo_pi_chunk(nsamples):
    inside = 0
    for _ in range(nsamples):
        time.sleep(0.005)
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        if x * x + y * y <= 1:
            inside += 1
    return inside

# Общее количество точек для симуляции
total_samples = 1000
# --- Последовательное выполнение ---
start_time_seq = time.time()
pi_estimate_seq = monte_carlo_pi(total_samples)
end_time_seq = time.time()
print(f"Последовательное вычисление π заняло: {end_time_seq - start_time_seq:.2f} сек")
print(f"Оценка π (последовательно): {pi_estimate_seq}")
# --- Параллельное выполнение ---
# Разбиваем задачи на куски для параллельной обработки
nchunks = 100  # Количество параллельных задач
samples_per_chunk = total_samples // nchunks

# Создаем список отложенных задач
tasks = [monte_carlo_pi_chunk(samples_per_chunk) for _ in range(nchunks)]

# Запускаем вычисления и суммируем результаты
start_time_par = time.time()
inside_total = dask.delayed(sum)(tasks).compute()
pi_estimate_par = (inside_total / total_samples) * 4
end_time_par = time.time()

print(f"Параллельное вычисление π заняло: {end_time_par - start_time_par:.2f} сек")
print(f"Оценка π (параллельно): {pi_estimate_par}")
# --- Оценка ускорения ---
acceleration = (end_time_seq - start_time_seq) / (end_time_par - start_time_par)
print(f"Ускорение: в {acceleration:.2f} раза")
dask.visualize(*tasks)


# t-13 Таратин Артём ПМ22-1.ipynb
# Markdown:
# Таратин Артём ПМ22-1
# Markdown:
### Задание
Парсинг web-страниц и создание БД по данным из выбранной категории

Распараллеливание алгоритма обработки страниц помощью Dask: Выбрать одну из категорий в Википедии с достаточно большим числом страниц (500-1000 и более). Распарсить страницы по объектам, выбрав по каждому несколько атрибутов (не менее 3-х). Построить XML-файл и по нему создать БД SQlite3 (см. t-3).
import requests
from bs4 import BeautifulSoup
from dask import delayed, compute
from dask.distributed import Client
import xml.etree.ElementTree as ET
import sqlite3
import pandas as pd
def get_category_pages(category_name):
    S = requests.Session()

    URL = "https://ru.wikipedia.org/w/api.php"

    PARAMS = {
        "action": "query",
        "list": "categorymembers",
        "cmtitle": f"Категория:{category_name}",
        "cmlimit": "max",
        "format": "json"
    }

    pages = []
    while True:
        response = S.get(url=URL, params=PARAMS)
        data = response.json()
        pages.extend([f"https://ru.wikipedia.org/wiki/{item['title'].replace(' ', '_')}" for item in data['query']['categorymembers']])
        if 'continue' in data:
            PARAMS['cmcontinue'] = data['continue']['cmcontinue']
        else:
            break

        if len(pages) >= 500:
            break

    return pages
category_name = 'Математики_по_алфавиту'
pages = get_category_pages(category_name)
print(f"Количество страниц: {len(pages)}")
def parse_page(url):
    headers = {
        "User-Agent": "Mozilla/5.0 (compatible; Bot/1.0; +http://example.com/bot)"
    }
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Извлечение имени
        name = soup.find('h1', {'id': 'firstHeading'}).text.strip()

        # Извлечение даты рождения
        birth_date = 'Неизвестно'
        bday = soup.find('span', {'class': 'bday'})
        if bday:
            birth_date = bday.text.strip()

        # Извлечение рода деятельности
        occupation = 'Неизвестно'
        infobox = soup.find('table', {'class': 'infobox'})
        if infobox:
            row = infobox.find(string='Род деятельности')
            if row:
                occupation = row.find_next('td').text.strip()

        return {'name': name, 'birth_date': birth_date, 'occupation': occupation}
    except Exception as e:
        print(f"Ошибка при обработке URL {url}: {e}")
        return {'name': 'Неизвестно', 'birth_date': 'Неизвестно', 'occupation': 'Неизвестно'}
import time
from tqdm import tqdm

# однопоточный вариант

start_time = time.time()
results = [parse_page(url) for url in tqdm(pages)]
end_time = time.time()
print(f"Потребовалось: {end_time - start_time} секунд")
# параллельное выполнение

start_time = time.time()
delayed_results = [delayed(parse_page)(url) for url in pages]
results = compute(*delayed_results)
end_time = time.time()
print(f"Потребовалось: {end_time - start_time} секунд")
root = ET.Element('persons', {'category': category_name})

for person in results:
    psn = ET.SubElement(root, 'psn', {
        'name': person['name'],
        'birth_date': person['birth_date'],
        'occupation': person['occupation']
    })

tree = ET.ElementTree(root)
tree.write('persons.xml', encoding='utf-8', xml_declaration=True)
# Парсинг XML-файла
tree = ET.parse('persons.xml')
root = tree.getroot()

data = []
for psn in root.findall('psn'):
    name = psn.get('name')
    birth_date = psn.get('birth_date')
    occupation = psn.get('occupation')
    data.append((name, birth_date, occupation))

# Создание базы данных
conn = sqlite3.connect('persons.db')
cursor = conn.cursor()
cursor.execute('''
    CREATE TABLE IF NOT EXISTS persons (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        birth_date TEXT,
        occupation TEXT
    )
''')

# Вставка данных
cursor.executemany('INSERT INTO persons (name, birth_date, occupation) VALUES (?, ?, ?)', data)
conn.commit()
conn.close()
conn = sqlite3.connect('persons.db')
df = pd.read_sql_query('SELECT * FROM persons', conn)
conn.close()
df.sample(5)


# t-14 Таратин Артём ПМ22-1.ipynb
# Markdown:
# Таратин Артём ПМ22-1
## Задание
**t-14 Привести XML-файл по категориям людей (t-13) в стандартный формат**

Стандарт для категории Родившиеся…MM DD
```
<persons day='03-19' сomm='содан 6.11  Л.Ч.'>
<psn y="1641" h="Абду-ль-Гани_ан-Наблуси" p="Персона/Ислам;мистик, писатель, поэт, Qur'anic exegete"/>
<psn y="1433" h="Абдуллах_ибн_Ибрахим"/>
```
https://drive.google.com/drive/folders/1DOchuAMj2cK33M4Wv0Nbzgb6-PqXwtI8   Название pyyMMDD.xml

Для других категорий
```
<p n="Адамс,_Джон_Фрэнк" b="1930-11-05" a="астроном, тополог, препод.ун-та" c="Великобритания"/>
```

Кодировка Windows-1251
Имена - по алфавиту (как в wiki-ссылке)
Свести число неопределенных дат рождения к минимуму
Убрать пустые атрибуты, в значении “a” убрать основную профессию
Использовать библиотеку работы с XML для чиения и записи
# Markdown:
## Результат
# Markdown:
# Markdown:
## Код
import xml.etree.cElementTree as ET
# Функция чтения XML-файла
def load_xml(source_filename):
    xml_tree = ET.parse(source_filename)
    xml_root = xml_tree.getroot()
    return xml_root
# Функция обработки и записи данных в новый XML-файл
def transform_and_save_xml(original_file, new_file):
    original_root = load_xml(original_file)
    if original_root is None:
        return

    # Создаём новый корневой элемент
    new_root = ET.Element("root")
    group_elem = ET.SubElement(new_root, "persons", attrib={"category": original_root.attrib.get('category', 'Unknown')})

    # Сортируем элементы по атрибуту 'name'
    sorted_people = sorted(original_root.findall('psn'), key=lambda elem: elem.attrib.get('name', 'Unknown'))

    # Формируем новую структуру
    for person in sorted_people:
        person_name = person.attrib.get('name', 'Unknown')
        person_birth = person.attrib.get('birth_date')
        person_occup = person.attrib.get('occupation')

        # Создаём элемент для каждого человека
        person_elem = ET.SubElement(group_elem, "psn", n=person_name)
        if person_birth != "Неизвестно":
            person_elem.set("b", person_birth)
        if person_occup != "Неизвестно":
            person_elem.set("a", person_occup)

    # Сохранение результата в новый файл
    final_tree = ET.ElementTree(new_root)
    final_tree.write(new_file, encoding="windows-1251", xml_declaration=True)
    print(f"Результат сохранён в '{new_file}'")
# Запуск
input_xml = 'persons.xml'
output_xml = 'norm_persons.xml'
transform_and_save_xml(input_xml, output_xml)
# Проверка результата
check_tree = ET.parse(output_xml)
check_root = check_tree.getroot()

# Отображение содержимого результата
if check_root is not None:
    raw_xml_output = ET.tostring(check_root, encoding='unicode', method='xml')
    nice_output = "\n".join(raw_xml_output.split("><"))
    print(nice_output)


# t-15 Таратин Артём ПМ22-1.ipynb
# Markdown:
# Таратин Артём ПМ22-1

## Задача
Имеется набор текстов. Необходимо для каждого слова, встречающегося в наборе текстов, посчитать, сколько раз встречается слово в наборе.

В качестве текстов взять литературные произведения из папки Материалы..../Texts (чем больше, тем лучше).

Предварительно тексты “очистить” и нормализовать слова.

## Результат
При помощи hadoop был подсчитано количество вхождений каждого слова в текстах из папки Texts. Результат записан в файл в папке output.

Скриншот части данных из выходного файла:

# Markdown:
## Код
%%writefile mapper.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
import re
from nltk.corpus import stopwords
import nltk

# Загрузить стоп-слова
nltk.download('stopwords')
russian_stopwords = set(stopwords.words('russian'))

for line in sys.stdin:
    # Удалить ненужные символы и привести к нижнему регистру
    line = line.strip().lower()
    line = re.sub(r"[^a-zа-я0-9\s]+", "", line)

    # Разделить строку на слова
    words = line.split()

    # Вывести "слово\t1", исключая стоп-слова
    for word in words:
        if word not in russian_stopwords:
            print(f"{word}\t1")
%%writefile reducer.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys

current_word = None
current_count = 0

for line in sys.stdin:
    line = line.strip()
    word, count = line.split("\t")
    count = int(count)

    if current_word == word:
        # Если слово совпадает с текущим, увеличиваем счётчик
        current_count += count
    else:
        # Если встречаем новое слово или первое слово в цикле
        #     то выводим предыдущее слово и его сумму
        if current_word:
            print(f"{current_word}\t{current_count}")
        current_word = word
        current_count = count

if current_word:
    print(f"{current_word}\t{current_count}")
# Создадим директорию если её нет
!hadoop fs -mkdir -p /Users/$(whoami)/input

# Загрузим в неё файлы
!hadoop fs -put -f ./texts/* /Users/$(whoami)/input/

# Удалим директорию output, если она есть
!rm -rf output
# Запустим hadoop
!hadoop jar /opt/homebrew/Cellar/hadoop/3.4.1/libexec/share/hadoop/tools/lib/hadoop-streaming-3.4.1.jar \
    -input \
        /Users/$(whoami)/input/input_texts.txt /Users/$(whoami)/input/AKareninf.txt \
        /Users/$(whoami)/input/leskov.txt /Users/$(whoami)/input/Lev_Tolstoj.txt \
        /Users/$(whoami)/input/PUSHKINS.txt /Users/$(whoami)/input/stouts.txt \
    -output ./output \
    -mapper mapper.py \
    -reducer reducer.py \
    -file mapper.py \
    -file reducer.py
import pandas as pd

# Считаем результат
data = pd.read_csv("output/part-00000", sep="\t", header=None)
data.columns = ['Слово', 'Частота']
data.sort_values(by='Частота', ascending=False, inplace=True)
data.head(10)
import matplotlib.pyplot as plt
import seaborn as sns

# Столбчатая диаграмма для топ 25 слов
top_10 = data.head(25)
plt.figure(figsize=(10,6))
sns.barplot(x='Частота', y='Слово', data=top_10)
plt.title('Топ 10 самых частотных слов')
plt.show()
from wordcloud import WordCloud

# Облако слов
word_freq = dict(zip(data['Слово'], data['Частота']))
wordcloud = WordCloud(width=800, height=400).generate_from_frequencies(word_freq)

plt.figure(figsize=(15, 7.5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('Облако слов частотности')
plt.show()

