Metadata-Version: 2.4
Name: sparkformers
Version: 0.0.2
Summary: Distributed deep learning for Hugging Face Transformers on Spark
Author-email: Daniel Cahall <danielenricocahall@gmail.com>
License: MIT
License-File: LICENSE
Requires-Python: <3.13,>=3.9
Requires-Dist: pyspark<=4.0.0
Requires-Dist: torch>=2.7.1
Requires-Dist: transformers<5.0.0
Description-Content-Type: text/markdown

# Overview
![img.png](https://raw.githubusercontent.com/danielenricocahall/sparkformers/main/logo.png)
Welcome to Sparkformers, where we offer distributed training of [Transformers](https://github.com/huggingface/transformers) models on [Spark](https://spark.apache.org/)!

# Motivation / Purpose
Derived from [Elephas](https://github.com/danielenricocahall/elephas), however with [HuggingFace removing support for Tensorflow](https://www.linkedin.com/posts/leonidboytsov_wow-the-huggingface-library-is-dropping-activity-7339003651773915137-mmrV#:~:text=I%20have%20bittersweet%20news%20to,even%20if%20outside%20of%20PyTorch.), I decided to spin some of the logic off into its own separate project, and also rework the paradigm to support the [Torch](https://pytorch.org/) backend! The purpose of this project is to serve as an experimental backend for distributed training that may be more developergonomic compared to other solutions such as [Ray](https://docs.ray.io/en/latest/train/train.html).

# Examples
Note that all examples are also available in the [examples directory](https://github.com/danielenricocahall/sparkformers/tree/main/examples).

## Autoregressive (Causal) Language Model Training and Inference
```python
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sparkformers.sparkformer import SparkFormer
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
)
import torch
batch_size = 20
epochs = 10

dataset = load_dataset("ag_news")
x = dataset["train"]["text"]

x_train, x_test = train_test_split(x, test_size=0.2)

model_name = "sshleifer/tiny-gpt2"

model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer_kwargs = {
    "max_length": 15,
    "padding": True,
    "truncation": True,
    "padding_side": "left",
}

sparkformer_model = SparkFormer(
    model=model,
    tokenizer=tokenizer,
    loader=AutoModelForCausalLM,
    optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
    loss_fn=lambda: torch.nn.CrossEntropyLoss(),
    tokenizer_kwargs=tokenizer_kwargs
)

# perform distributed training
sparkformer_model.train(x_train, epochs=epochs, batch_size=batch_size)

# perform distributed generation
generations = sparkformer_model.generate(
    x_test, max_new_tokens=10, num_return_sequences=1
)
# decode the generated texts
generated_texts = [
    tokenizer.decode(output, skip_special_tokens=True) for output in generations
]

for i, text in enumerate(generated_texts):
    print(f"Original text {i}: {x_test[i]}")
    print(f"Generated text {i}: {text}")
```

## Sequence Classification
```python
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sparkformers.sparkformer import SparkFormer
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
)
import numpy as np
import torch
batch_size = 20
epochs = 10

newsgroups = fetch_20newsgroups(subset="train")
x = newsgroups.data
y = newsgroups.target

encoder = LabelEncoder()
y_encoded = encoder.fit_transform(y)

x_train, x_test, y_train, y_test = train_test_split(x, y_encoded, test_size=0.5)

model_name = "albert-base-v2"

model = AutoModelForSequenceClassification.from_pretrained(
    model_name, num_labels=len(np.unique(y_encoded))
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer_kwargs = {"padding": True, "truncation": True}

sparkformer_model = SparkFormer(
    model=model,
    tokenizer=tokenizer,
    loader=AutoModelForSequenceClassification,
    optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
    loss_fn=lambda: torch.nn.CrossEntropyLoss(),
    tokenizer_kwargs=tokenizer_kwargs,
    num_workers=2,
)

# perform distributed training
sparkformer_model.train(x_train, y_train, epochs=epochs, batch_size=batch_size)

# perform distributed prediction
predictions = sparkformer_model.predict(x_test)

# review the predicted labels
print([np.argmax(pred) for pred in predictions])
```

## Token Classification (NER)
```python
from sklearn.model_selection import train_test_split
from sparkformers.sparkformer import SparkFormer
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
)
from datasets import load_dataset
import numpy as np
import torch
batch_size = 5
epochs = 2
model_name = "hf-internal-testing/tiny-bert-for-token-classification"

model = AutoModelForTokenClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"], truncation=True, is_split_into_words=True
    )
    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

dataset = load_dataset("conll2003", split="train[:5%]", trust_remote_code=True)
dataset = dataset.map(tokenize_and_align_labels, batched=True)

x = dataset["tokens"]
y = dataset["labels"]

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

tokenizer_kwargs = {
    "padding": True,
    "truncation": True,
    "is_split_into_words": True,
}

sparkformer_model = SparkFormer(
    model=model,
    tokenizer=tokenizer,
    loader=AutoModelForTokenClassification,
    optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
    loss_fn=lambda: torch.nn.CrossEntropyLoss(),
    tokenizer_kwargs=tokenizer_kwargs,
    num_workers=2,
)

sparkformer_model.train(x_train, y_train, epochs=epochs, batch_size=batch_size)

inputs = tokenizer(x_test, **tokenizer_kwargs, return_tensors="pt")
distributed_preds = sparkformer_model(**inputs)
print([int(np.argmax(x)) for x in np.squeeze(distributed_preds)])

 ```

# TODO
- [ ] Validate both GPU and CPU are supported (Elephas supports both, just need to validate the Torch API is being used correctly)
- [ ] Add support for distributed training of other model types (e.g., image classification, object detection, etc.)
- [ ] Add support for distributed training of custom models
- [ ] Consider simplifying the API further (e.g; builder pattern, providing the model string and push loader logic inside the `SparkFormer` class, etc.)
- [ ] Support Tensorflow/Keras models for completeness (potentially taking similar approach as `transformers` where each class is prefixed with `TF` - it would essentially be copying the old logic from Elephas)
> 💡 Interested in contributing? Check out the [Local Development & Contributions Guide](https://github.com/danielenricocahall/sparkformers/blob/main/CONTRIBUTING.md).