Metadata-Version: 2.4
Name: mongo-aggro
Version: 0.1.0
Summary: MongoDB Aggregation Pipeline Builder with Pydantic
License-Expression: MIT
License-File: LICENSE
Keywords: mongodb,aggregation,pipeline,pydantic,database
Author: Hamed Ghenaat
Requires-Python: >=3.12
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Database
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Provides-Extra: dev
Provides-Extra: docs
Provides-Extra: test
Requires-Dist: black (>=24.10.0) ; extra == "dev"
Requires-Dist: isort (>=5.13.2) ; extra == "dev"
Requires-Dist: mkdocs (>=1.6.0) ; extra == "docs"
Requires-Dist: mkdocs-material (>=9.5.0) ; extra == "docs"
Requires-Dist: mkdocstrings (>=0.27.0) ; extra == "docs"
Requires-Dist: mkdocstrings-python (>=1.12.0) ; extra == "docs"
Requires-Dist: mypy (>=1.13.0) ; extra == "dev"
Requires-Dist: pre-commit (>=4.0.0) ; extra == "dev"
Requires-Dist: pydantic (>=2.10.0)
Requires-Dist: pytest (>=8.0.0) ; extra == "test"
Requires-Dist: pytest-cov (>=6.0.0) ; extra == "test"
Requires-Dist: pytest-mock (>=3.14.0) ; extra == "test"
Requires-Dist: pyupgrade (>=3.19.0) ; extra == "dev"
Requires-Dist: ruff (>=0.8.0) ; extra == "dev"
Project-URL: Documentation, https://hamedghenaat.github.io/mongo-aggro/
Project-URL: Homepage, https://github.com/hamedghenaat/mongo-aggro
Project-URL: Repository, https://github.com/hamedghenaat/mongo-aggro
Description-Content-Type: text/markdown

# Mongo Aggro - MongoDB Aggregation Pipeline Builder

A Python package for building MongoDB aggregation pipelines with strong type checking using Pydantic.

## Features

- **Type-safe pipeline building** - Pydantic models ensure type safety at runtime
- **Direct MongoDB integration** - Pass `Pipeline` directly to `collection.aggregate()` without calling any methods
- **Comprehensive stage support** - All major MongoDB aggregation stages supported
- **Nested pipelines** - Stages like `$lookup`, `$facet`, and `$unionWith` support nested pipelines
- **Query operators** - Built-in support for logical operators (`$and`, `$or`, `$not`, `$nor`) and comparison operators
- **Accumulator classes** - Type-safe accumulator builders for `$group` stage

## Installation

```bash
poetry add mongo-aggro
```

## Quick Start

```python
from mongo_aggro import Pipeline, Match, Unwind, Group, Sort, Limit

# Create a pipeline
pipeline = Pipeline()
pipeline.add_stage(Match(query={"status": "active"}))
pipeline.add_stage(Unwind(path="items"))
pipeline.add_stage(Group(id="$category", accumulators={"count": {"$sum": 1}}))
pipeline.add_stage(Sort(fields={"count": -1}))
pipeline.add_stage(Limit(count=10))

# Pass directly to MongoDB - no need to call any methods
results = collection.aggregate(pipeline)
```

Or initialize with stages in the constructor:

```python
pipeline = Pipeline([
    Match(query={"status": "active"}),
    Unwind(path="items"),
    Group(id="$category", accumulators={"count": {"$sum": 1}}),
    Sort(fields={"count": -1}),
    Limit(count=10)
])
```

## Supported Stages

### Document Filtering & Transformation
- **Match** - Filter documents (`$match`)
- **Project** - Shape documents (`$project`)
- **AddFields / Set** - Add new fields (`$addFields`, `$set`)
- **Unset** - Remove fields (`$unset`)
- **ReplaceRoot / ReplaceWith** - Replace document root (`$replaceRoot`, `$replaceWith`)
- **Redact** - Restrict document content (`$redact`)

### Grouping & Aggregation
- **Group** - Group and aggregate (`$group`)
- **Bucket** - Categorize into buckets (`$bucket`)
- **BucketAuto** - Auto-categorize into buckets (`$bucketAuto`)
- **SortByCount** - Group, count, and sort (`$sortByCount`)
- **Count** - Count documents (`$count`)

### Array Operations
- **Unwind** - Deconstruct arrays (`$unwind`)

### Sorting & Pagination
- **Sort** - Sort documents (`$sort`)
- **Limit** - Limit results (`$limit`)
- **Skip** - Skip documents (`$skip`)
- **Sample** - Random sampling (`$sample`)

### Joins & Lookups
- **Lookup** - Left outer join (`$lookup`)
- **GraphLookup** - Recursive search (`$graphLookup`)

### Multiple Pipelines
- **Facet** - Multiple pipelines in single stage (`$facet`)
- **UnionWith** - Union with another collection (`$unionWith`)

### Output
- **Out** - Write to collection (`$out`)
- **Merge** - Merge into collection (`$merge`)

### Geospatial
- **GeoNear** - Geospatial queries (`$geoNear`)

### Window Functions & Analytics
- **SetWindowFields** - Window calculations (`$setWindowFields`)
- **Densify** - Fill gaps in data (`$densify`)
- **Fill** - Fill null/missing values (`$fill`)

### Utility
- **Documents** - Return literal documents (`$documents`)

## Accumulators

Type-safe accumulator classes for the `$group` stage:

```python
from mongo_aggro import Sum, Avg, Min, Max, First, Last, Push, AddToSet, Count_
from mongo_aggro import merge_accumulators

# Each accumulator returns a dictionary
Sum(name="totalQuantity", field="quantity").model_dump()
# Output: {"totalQuantity": {"$sum": "$quantity"}}

Avg(name="avgPrice", field="price").model_dump()
# Output: {"avgPrice": {"$avg": "$price"}}

# Use value=1 for counting
Sum(name="count", value=1).model_dump()
# Output: {"count": {"$sum": 1}}

# Push with expression
Push(name="orderDetails", expression={"item": "$item", "qty": "$quantity"}).model_dump()
# Output: {"orderDetails": {"$push": {"item": "$item", "qty": "$quantity"}}}
```

### Merging Accumulators

Use `merge_accumulators` to combine multiple accumulators:

```python
from mongo_aggro import Group, Sum, Avg, Max, Min, merge_accumulators

# Merge multiple accumulators for Group stage
group = Group(
    id="$category",
    accumulators=merge_accumulators(
        Sum(name="totalSales", field="amount"),
        Avg(name="avgPrice", field="price"),
        Max(name="maxPrice", field="price"),
        Min(name="minPrice", field="price"),
        Sum(name="orderCount", value=1)
    )
)
# Output: {"$group": {
#     "_id": "$category",
#     "totalSales": {"$sum": "$amount"},
#     "avgPrice": {"$avg": "$price"},
#     "maxPrice": {"$max": "$price"},
#     "minPrice": {"$min": "$price"},
#     "orderCount": {"$sum": 1}
# }}
```

### Available Accumulators

| Accumulator | Description |
|-------------|-------------|
| `Sum` | Sum values or count with `value=1` |
| `Avg` | Calculate average |
| `Min` | Get minimum value |
| `Max` | Get maximum value |
| `First` | First value in group |
| `Last` | Last value in group |
| `Push` | Create array of values |
| `AddToSet` | Create array of unique values |
| `StdDevPop` | Population standard deviation |
| `StdDevSamp` | Sample standard deviation |
| `Count_` | Count documents (MongoDB 5.0+) |
| `MergeObjects` | Merge documents |
| `TopN` | Top N elements (MongoDB 5.2+) |
| `BottomN` | Bottom N elements (MongoDB 5.2+) |
| `FirstN` | First N elements (MongoDB 5.2+) |
| `LastN` | Last N elements (MongoDB 5.2+) |
| `MaxN` | N maximum values (MongoDB 5.2+) |
| `MinN` | N minimum values (MongoDB 5.2+) |

## Examples

### Complex Match with Logical Operators

```python
from mongo_aggro import Match, And, Or

# Using $and and $or directly in query dict
match = Match(query={
    "$and": [
        {"status": "active"},
        {"$or": [
            {"type": "premium"},
            {"balance": {"$gt": 1000}}
        ]}
    ]
})

# Or use operator classes for building conditions
and_cond = And(conditions=[
    {"status": "active"},
    {"age": {"$gte": 18}}
])
print(and_cond.model_dump())  # {"$and": [{"status": "active"}, {"age": {"$gte": 18}}]}

# Complex nested conditions
or_cond = Or(conditions=[
    {"region": "US"},
    And(conditions=[{"region": "EU"}, {"premium": True}]).model_dump()
])
```

### Combining Stages with Operators

```python
from mongo_aggro import (
    Pipeline, Match, Group, Sort, Limit, Project,
    And, Or, Expr, In, Gt, Regex,
    Sum, Avg, Max, merge_accumulators
)

# Build a complex analytics pipeline
pipeline = Pipeline()

# Stage 1: Match with complex conditions
pipeline.add_stage(Match(query={
    "$and": [
        {"status": {"$in": ["completed", "shipped"]}},
        {"orderDate": {"$gte": "2024-01-01"}},
        {"$or": [
            {"totalAmount": {"$gt": 100}},
            {"priority": "high"}
        ]}
    ]
}))

# Stage 2: Group with multiple accumulators
pipeline.add_stage(Group(
    id={"region": "$region", "category": "$category"},
    accumulators=merge_accumulators(
        Sum(name="totalRevenue", field="totalAmount"),
        Avg(name="avgOrderValue", field="totalAmount"),
        Max(name="largestOrder", field="totalAmount"),
        Sum(name="orderCount", value=1)
    )
))

# Stage 3: Match groups with significant revenue
pipeline.add_stage(Match(query={
    "totalRevenue": {"$gt": 10000}
}))

# Stage 4: Sort by revenue
pipeline.add_stage(Sort(fields={"totalRevenue": -1}))

# Stage 5: Limit results
pipeline.add_stage(Limit(count=20))

# Stage 6: Project final shape
pipeline.add_stage(Project(fields={
    "_id": 0,
    "region": "$_id.region",
    "category": "$_id.category",
    "totalRevenue": 1,
    "avgOrderValue": {"$round": ["$avgOrderValue", 2]},
    "orderCount": 1
}))
```

### Lookup with Nested Pipeline

```python
from mongo_aggro import Pipeline, Match, Lookup

lookup = Lookup(
    from_collection="orders",
    let={"customerId": "$_id"},
    pipeline=Pipeline([
        Match(query={"$expr": {"$eq": ["$customerId", "$$customerId"]}}),
        Match(query={"status": "completed"})
    ]),
    as_field="completedOrders"
)
```

### Facet with Multiple Pipelines

```python
from mongo_aggro import Pipeline, Facet, Group, Sort, Limit, Sum, merge_accumulators

facet = Facet(pipelines={
    "byCategory": Pipeline([
        Group(
            id="$category",
            accumulators=merge_accumulators(
                Sum(name="count", value=1),
                Sum(name="total", field="amount")
            )
        ),
        Sort(fields={"count": -1})
    ]),
    "byRegion": Pipeline([
        Group(
            id="$region",
            accumulators=merge_accumulators(
                Sum(name="count", value=1),
                Avg(name="avgAmount", field="amount")
            )
        ),
        Sort(fields={"avgAmount": -1})
    ]),
    "topProducts": Pipeline([
        Sort(fields={"sales": -1}),
        Limit(count=10)
    ])
})
```

### Using Query Operators with Match

```python
from mongo_aggro import Match, Regex, In, Exists, ElemMatch

# Text search with regex
pipeline.add_stage(Match(query={
    "name": Regex(pattern="^John", options="i").model_dump()
}))

# Check field existence
pipeline.add_stage(Match(query={
    "email": Exists(exists=True).model_dump(),
    "deletedAt": Exists(exists=False).model_dump()
}))

# Array element matching
pipeline.add_stage(Match(query={
    "items": ElemMatch(conditions={
        "quantity": {"$gt": 5},
        "price": {"$lt": 100}
    }).model_dump()
}))

# Combining multiple operator types
pipeline.add_stage(Match(query={
    "$and": [
        {"status": In(values=["active", "pending"]).model_dump()},
        {"score": Gt(value=80).model_dump()},
        {"tags": {"$exists": True}}
    ]
}))
```

### Unwind with Options

```python
from mongo_aggro import Unwind

# Simple unwind
unwind = Unwind(path="items")
# Output: {"$unwind": "$items"}

# With options
unwind = Unwind(
    path="items",
    include_array_index="itemIndex",
    preserve_null_and_empty=True
)
# Output: {"$unwind": {"path": "$items", "includeArrayIndex": "itemIndex", "preserveNullAndEmptyArrays": true}}
```

### Group with Accumulators

```python
from mongo_aggro import Group, Sum, Avg, Max, Push, merge_accumulators

group = Group(
    id="$category",
    accumulators=merge_accumulators(
        Sum(name="totalQuantity", field="quantity"),
        Avg(name="avgPrice", field="price"),
        Max(name="maxPrice", field="price"),
        Push(name="items", field="name")
    )
)
```

### Complete E-commerce Analytics Example

```python
from mongo_aggro import (
    Pipeline, Match, Unwind, Group, Sort, Limit, Project, Lookup,
    Sum, Avg, Max, First, Push, merge_accumulators
)

# Analyze orders with customer details
pipeline = Pipeline([
    # Filter recent orders
    Match(query={
        "orderDate": {"$gte": "2024-01-01"},
        "status": {"$ne": "cancelled"}
    }),

    # Join with customers
    Lookup(
        from_collection="customers",
        local_field="customerId",
        foreign_field="_id",
        as_field="customer"
    ),

    # Unwind customer (single element array)
    Unwind(path="customer"),

    # Unwind order items
    Unwind(path="items"),

    # Group by product and customer region
    Group(
        id={
            "product": "$items.productId",
            "region": "$customer.region"
        },
        accumulators=merge_accumulators(
            Sum(name="totalQuantity", field="items.quantity"),
            Sum(name="totalRevenue", field="items.subtotal"),
            Avg(name="avgQuantity", field="items.quantity"),
            Sum(name="orderCount", value=1),
            First(name="productName", field="items.name")
        )
    ),

    # Filter significant sales
    Match(query={"totalRevenue": {"$gt": 1000}}),

    # Sort by revenue
    Sort(fields={"totalRevenue": -1}),

    # Top 50 results
    Limit(count=50),

    # Final projection
    Project(fields={
        "_id": 0,
        "product": "$productName",
        "region": "$_id.region",
        "totalQuantity": 1,
        "totalRevenue": {"$round": ["$totalRevenue", 2]},
        "avgQuantity": {"$round": ["$avgQuantity", 1]},
        "orderCount": 1
    })
])

# Execute
results = db.orders.aggregate(pipeline)
```

## Query Operators

The package includes query operators for building complex conditions:

```python
from mongo_aggro import And, Or, Not, Nor, Expr, Eq, Gt, In, Regex

# Logical operators
And(conditions=[{"a": 1}, {"b": 2}]).model_dump()  # {"$and": [...]}
Or(conditions=[{"a": 1}, {"a": 2}]).model_dump()   # {"$or": [...]}
Not(condition={"$regex": "^test"}).model_dump()    # {"$not": {...}}
Nor(conditions=[{"a": 1}, {"b": 2}]).model_dump()  # {"$nor": [...]}

# Comparison operators
Eq(value=5).model_dump()           # {"$eq": 5}
Gt(value=10).model_dump()          # {"$gt": 10}
In(values=[1, 2, 3]).model_dump()  # {"$in": [1, 2, 3]}
Regex(pattern="^test", options="i").model_dump()  # {"$regex": "^test", "$options": "i"}
```

## Method Chaining

The `add_stage` method returns the pipeline, enabling method chaining:

```python
pipeline = (
    Pipeline()
    .add_stage(Match(query={"active": True}))
    .add_stage(Sort(fields={"createdAt": -1}))
    .add_stage(Limit(count=100))
)
```

## How It Works

The `Pipeline` class implements `__iter__`, which yields each stage's dictionary representation when iterated. MongoDB's `aggregate()` method iterates over the pipeline argument, so no conversion is needed:

```python
# This works because MongoDB iterates over the pipeline
collection.aggregate(pipeline)

# Equivalent to:
collection.aggregate([
    {"$match": {"status": "active"}},
    {"$unwind": "$items"},
    # ...
])
```

