Metadata-Version: 2.1
Name: decent-dp
Version: 0.0.2
Summary: An implementation of decentralized data parallel based on PyTorch
Author-email: Zesen Wang <zesen@kth.se>
License: MIT License
        
        Copyright (c) 2024 Zesen Wang
        
        Permission is hereby granted, free of charge, to any person obtaining a copy
        of this software and associated documentation files (the "Software"), to deal
        in the Software without restriction, including without limitation the rights
        to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
        copies of the Software, and to permit persons to whom the Software is
        furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all
        copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
        IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
        FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
        AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
        LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
        OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        SOFTWARE.
        
Project-URL: Homepage, https://github.com/WangZesen/decent-dp
Project-URL: Issues, https://github.com/WangZesen/decent-dp/issues
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: torch >=2.1.0
Requires-Dist: loguru >=0.7.0
Provides-Extra: test
Requires-Dist: tqdm >=4.66.0 ; extra == 'test'

# Decentralized Data Parallel

The package is an PyTorch extension that faciliates multi-GPU decentralized data parallel training.

## Install
```bash
pip install decent-dp
```

## How to use

### Basic Usage

Firstly, it should have distributed environment available, which means the script should be run by [`torchrun`](https://pytorch.org/docs/stable/elastic/run.html).

Before making the model distributed, it should initialize the distributed environment by
```python
import torch.distributed as dist
dist.init_process_group()
```
Then wrap the model with `DecentralizedDataParallel`. Since the optimizer and the learning rate scheduler are fused in the backward pass, one will need to provide two functions: (1) `optim_fn` (`Callable[[List[Tuple[Tensor, str]]], Optimizer]`): the function constructs the optimizer based on the list of parameters with their names. (2) `lr_scheduler_fn` (`Callable[[Optimizer], LRScheduler]`, optional): the function constructs the learning rate scheduler based on the provided optimizer. Examples of two functions can be found at `decent_dp.optim.optim_fn_adam` and `decent_dp.optim.lr_scheduler_fn_cosine_with_warmup`.


```python
from decent_dp import DecentralizedDataParallel as DDP
model = ...
model = DDP(model,
            optim_fn=...,
            lr_scheduler_fn=...,
            topology=...)
```

### Arguments of DecentralizedDataParallel

- `optim_fn` (`Callable[[List[Tuple[Tensor, str]]], Optimizer]`): the function constructs the optimizer based on the list of parameters with their names.
- `lr_scheduler_fn` (`Callable[[Optimizer], LRScheduler]`, optional, default to `None`): the function constructs the learning rate scheduler based on the provided optimizer.
- `topology` (`str`, optional, defualt to `complete`): the name of the communication topology. Provided: `complete` (fully connected graph), `ring` (one-peer ring graph), `one-peer-exp` (one-peer exponential graph), and `alternating-exp-ring` (alternating exponential ring). It can be the name of customized communication topology as introduced in [this section](#customized-communication-topology).
- `scaler` (`GradScaler`, optional, default to `None`): gradient scaler for automatic mixed precision training.
- `param_as_bucket_view` (`bool`, optional, default to `True`): If set the `True`, the parameters in one bucket will be stored in a continuous memory block.
- `sync_buffer_in_global_avg` (`bool`, optional, default to `False`): If set to `True`, it will also synchronize the buffers (like the moving average in batch normalization layers) when calling `.global_avg()`.
- `bucket_size_in_mb` (`int`, optional, default to `25`): the argument decides the size of the bucket in MB. `25` is also the default value of DDP of PyTorch.
- `profile_mode` (`bool`, optional, default to `False`): If set to `True`, it will record the per-iteration runtime and the non-overlapped communication time in the recent 1000 iterations. To get to statistics, call `.get_time_stats()`.
- `local_world_size` (`int`, optional, default to `None`): The local world size is acquired from the environment variable by default. One can override the value to simulate different local world sizes. Note that it should be less than or equal to the global world size, and the global world size should be divisible by the local world size.


### Public Methods of DecentralizedDataParallel

- Delegation methods:
    - `train(self, mode: bool = True)`: same as `torch.nn.Module`'s [`train`](https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.train).
    - `eval(self)`: same as `torch.nn.Module`'s [`eval`](https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.eval).
    - `parameters(self, recurse: bool = True) -> Iterator[Parameter]`: same as `torch.nn.Module`'s [`parameters`](https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.parameters)
    - `named_parameters(self, prefix: str = '', recurse: bool = True, remove_duplicate: bool = True) -> Iterator[Tuple[str, Parameter]]`: same as `torch.nn.Module`'s [`named_parameters`](https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.named_parameters).
- Others:
    - `get_time_stats(self) -> Dict[str, deque]`: If in profile mode, return the time statistics. The keys of the returned dictionary are `compute`, `non_overlap_comm`, `iter` standing for the time taken by computation, non-overlapped communication, and the whole iteration, respectively. (`compute` + `non_overlap_comm` = `iter`)
    - `reset_time_stats(self)`: Clear the profile statistics.
    - `global_avg(self)`: Globally average the parameters. Also globally average the buffers if `sync_buffer_in_global_avg=True`. Typically, the function should be called at the end of each epoch and before the validation steps or saving the checkpoints.

## Supported Schema

The decentralized algorithm schema should follow
$$x_{i}^{(t)}=d_i^{(t)}+\sum_{j\in\mathcal{N}(i)}W_{ij}x_i^{(t-1)}\\ d_i^{(t)}=G\left(F_i,x_j^{(t-1)}\right)$$
which means that the local update doesn't depend on the neighbors' models in the same iteration, but it can 

## Customized Communication Topology

Currently, the provided communication topologies are `complete`, `ring`, `one-peer-exp` and `alternating-exp-ring`.

One can introduce customized by registering additional topologies.
```python
from decent_dp.topo import Topology, TopologyReg, Edge

@TopologyReg.register('custom-topology')
class CustomTopology(Topology):
    def _get_topo_edges(self) -> List[List[Edge]]:
        ...
```

One should override the `_get_topo_edges` method to provide the edges in every iteration in the loop. In the current version, it performs sanity check to make sure that (1) every worker is involved in every iteraion. (2) every worker is involvede in only one edge in every iteration.

The preset topologies are good examples which can be found at `decent_dp.topo.CompleteTopology`, `decent_dp.topo.RingTopology`, and etc..
