Metadata-Version: 2.1
Name: hestia-earth-converters
Version: 0.0.2
Summary: HESTIA's set of file converters
Home-page: https://gitlab.com/hestia-earth/hestia-convert-base
Author: @ToffeeLabs
Author-email: community@hestia.earth
License: MIT
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3.9
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: Werkzeug
Requires-Dist: pydantic==2.*
Requires-Dist: pydantic-core==2.*
Requires-Dist: hestia-earth-schema>=33.6.0
Requires-Dist: hestia-earth-utils
Requires-Dist: python-dateutil
Requires-Dist: numpy
Requires-Dist: pandas
Requires-Dist: setuptools
Provides-Extra: simapro
Requires-Dist: hestia-earth-models>=0.74.8; extra == "simapro"
Requires-Dist: hestia-earth-utils; extra == "simapro"
Requires-Dist: bw-simapro-csv==0.4.2; extra == "simapro"
Requires-Dist: pydantic==2.*; extra == "simapro"
Requires-Dist: pydantic[email]; extra == "simapro"
Requires-Dist: requests; extra == "simapro"

# Hestia schema converter common base

This library lets you create a converter that can translate one Pydantic object from one schema to another.

It is used as a common base to create schema converters for Life cycle analysis (LCI) software.

This repo includes code:
- to do LCI flow mappings: exchanging a term from one LCI nomenclature to an equivalent term in another nomenclature
- to describe how the fields of a schema or nested schemas of 2 LCI schemas relate to eachother
- to convert one pydantic schema to another
- a demo converter that can convert between the HESTIA Schema to the OpenLCA schema.

## Setup

1. Install the library and the converters you want to use:
```
pip install hestia-earth-converters
pip install "hestia-earth-converters[simapro]"
```
2. Convert an HESTIA ImpactAssessment to SimaPro format:
```
hestia-convert --output-folder samples --input-format HESTIA --output-format SimaPro --hestia-impact-id africanAubergineFruit-cote-divoire-2010-2025-20250427
```

The converted file will be stored under `samples` directory.

## Flow mapping.

Given a input HESTIA "term" / "flow" such as:

```json
{
  "id": "GADM-COL",
  "name": "Colombia",
  "termType": "region",
  "type": "Term"
}
```
we can use the code:

```python
from RosettaFlow import FlowMap

term_map_obj = FlowMap(PATH_TO_MAPPING_FILES)
candidates = term_map_obj.map_flow({"id": "GADM-COL",
                                    "name": "Colombia",
                                    "termType": "region",
                                    "type": "Term"}, target_nomenclature="openLCA")
```
to get a list of known equivalent "openLCA" location flow:

```python
print(candidates)
```
```python
[CandidateFlow(MatchCondition='=', ConversionFactor=1.0, FlowName='Colombia', FlowUUID='ab6c0400-6660-3ef2-919d-512b21dce9ab', FlowContext='Locations', Unit='LOCATION', Mapper='hestia', Verifier='hestia', LastUpdated='2025-05-29')]
```
or

```python
for c in candidate_mapped_flows:
    pprint.pprint(dict(c))
```
```python
{'ConversionFactor': 1.0,
 'FlowContext': 'Locations',
 'FlowName': 'Colombia',
 'FlowUUID': 'ab6c0400-6660-3ef2-919d-512b21dce9ab',
 'LastUpdated': '2025-05-29',
 'Mapper': 'hestia',
 'MatchCondition': '=',
 'Unit': 'LOCATION',
 'Verifier': 'hestia'}
```


The mappings are stored in a standardised csv file format as defined by the GLAD project. [GLAD repository UNEP-Economy-Division](https://github.com/UNEP-Economy-Division/GLAD-ElementaryFlowResources) described here [FlowMapping.md](https://github.com/UNEP-Economy-Division/GLAD-ElementaryFlowResources/blob/master/Formats/FlowMapping.md)
This format is compatible with the [USEPA format](https://github.com/USEPA/fedelemflowlist) described here [USEPA FlowMapping.md](https://github.com/USEPA/fedelemflowlist/blob/master/format%20specs/FlowMapping.md)

Please use the template file [FlowMapping.csv](https://github.com/UNEP-Economy-Division/GLAD-ElementaryFlowResources/blob/master/Formats/FlowMapping.csv) when creating new mappings.

The `FlowMap` class contains functions to search flow mappings, validate entries in csv files, and helper functions to
create new mappings, select the "best" candidate for every situation.

### Symmetry of flow maps.

By default `term_map_obj.map_flow()` searches for reverse "=" and "~" mappings (right to left in the csv file) if it cannot find a mapping in "SourceUUID" (left to right in the csv file). This can be disabled using the `check_reverse=False` parameter. Any found `ConversionFactor` ratios returned in this case will be inverted: `1/original_conversion_fator`

### TODO:
- Add support for correctly handling "superset of', 'a subset of', 'a proxy for', aka `>`, `<`, and `~`
- Add support daisy channing mappings by recursively checking mappings to other nomenclatures when no direct mapping exists.
- Update `pick_best_match()`: add sort by `LastUpdated` date, trusted/preferred `Mapper`, trusted `Verifier`, closet relevant `TargetFlowContext`, prioritise `=` over `~`

# Pydantic object converter.

Originally based on pymapme https://github.com/funnydman/pymapme by author funnydman and heavily modified by the Hestia team.

## Basics
Given 2 pydantic models, the `Converter` class can convert common fields from one to the other with no configuration:

```python
from pydantic import BaseModel


class ModelA(BaseModel):
    some_field_one: str = None
    name: str = None


class ModelB(BaseModel):
    some_field_two: str = None
    name: str = None


from Converter import Converter

converter_obj = Converter()

instance_of_model_a = ModelA(**{"some_field_one": "something", "name": "bob"})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Type of 'instance_of_model_b' is : {type(instance_of_model_b)}")
print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
```
```
Type of 'instance_of_model_b' is : <class 'ModelB'>
Data in 'instance_of_model_b': {'some_field_two': None, 'name': 'bob'}
```

## Mapping fields between 2 schemas
If 2 schemas have fields with the same information, but different field names, you can map them using:
```python
converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "some_field_two": "some_field_one"
                                 })

instance_of_model_a = ModelA(**{"some_field_one": "something", "name": "bob"})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
```
```
Data in 'instance_of_model_b': {'some_field_two': 'something', 'name': 'bob'}
```

Registered field maps between models are symmetrical, so converting back from `ModelB` to `ModelA` will use the reverse
mapping.

### Mapping deeper nested fields
If a model has a field that contains a nested schema, you can use a `.` to map to a nested field:

```python
class ModelC(BaseModel):
    field_in_c_one: int = None
    field_in_c_two: str = None


class ModelA(BaseModel):
    some_field_one: str = None
    name: str = None
    some_data: ModelC = None


converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "some_field_two": "some_data.field_in_c_two"
                                 })

instance_of_model_a = ModelA(**{"some_field_one": "something",
                                "name": "bob",
                                "some_data": {
                                    "field_in_c_one": 4,
                                    "field_in_c_two": "Some nested string"}
                                })
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)
print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
```
```
Data in 'instance_of_model_b': {'some_field_two': 'Some nested string', 'name': 'bob'}
```

Currently, mappings to nested fields are not symmetrical.

TODO

### Using custom functions for each field

Sometimes the contents of one field must be transformed when moving to a new schema. To do this you can specify a custom function:

```python
from pydantic import BaseModel

class ModelA(BaseModel):
    length_in_km: int = None


class ModelB(BaseModel):
    length_in_m: float = None

def _convert_km_to_m(source_model:ModelA, **kwargs)-> float:
    return source_model.length_in_km * 1000

converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "length_in_m": _convert_km_to_m,
                                 })

instance_of_model_a = ModelA(**{"length_in_km": 2})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
```
```
Data in 'instance_of_model_b': {'length_in_m': 2000.0}
```

Note the use of `**kwargs` in `_convert_km_to_m()`. Multiple arguments are made available to custom functions such as:
- 'field_name' the name of the model field,
- 'default' the default object the converter saves when no data is found,
- 'model_data': a dict containing model data extracted so far

To keep the mapping symmetrical, you need to also map the opposite equivalent function in reverse:
```python
converter_obj.register_model_map(source_model_type=ModelB,
                                 destination_model_type=ModelA,
                                 map_field_dict={
                                     "length_in_km": _convert_m_to_km,
                                 })
```
or
```python
converter_obj.register_model_map(source_model_type=ModelB,
                                 destination_model_type=ModelA,
                                 map_field_dict={
                                     "length_in_m": lambda source_model,field_name,default,model_data: source_model.length_in_km/1000,
                                 })
```

### Automatic re-use of defined mappings.

Once a mapping between 2 pydantic models has been defined, it will automatically be used if encountered when converting a different pydantic model that uses that model in a subfield:

```python
from pydantic import BaseModel, Field


class HestiaTerm(BaseModel):
    type: str = Field(default="Term")
    id: str = None


class HestiaIndicator(BaseModel):
    type: str = Field(default="Indicator")
    term: HestiaTerm
    value: float


class OpenLcaFlow(BaseModel):
    id: str = None


class OpenLcaExchange(BaseModel):
    flow: OpenLcaFlow = Field(default=None)
    amount: float = Field(default=None)


def _convert_Hestia_Term_to_openLCA_flow_ref(source_model: HestiaTerm, **kwargs) -> OpenLcaFlow:
    candidate_mapped_flows = term_map_obj.map_flow(source_model.model_dump())
    best_candidate = candidate_mapped_flows[0]
    return OpenLcaFlow(id=best_candidate.FlowUUID)


converter_obj.register_model_map(source_model_type=HestiaTerm,
                                 destination_model_type=OpenLcaFlow,
                                 map_function=_convert_Hestia_Term_to_openLCA_flow_ref)

converter_obj.register_model_map(source_model_type=HestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_field_dict={
                                     "flow": "term",
                                     "amount": "value"
                                 })

instance_of_hestia_indicator = HestiaIndicator(**{
    "type": "Indicator",
    "term": {
        "type": "Term",
        "id": "nh3ToAirInputsProduction"
    },
    "value": 3.4
})

instance_of_openLca_exchange = converter_obj.transmute(source_model_obj=instance_of_hestia_indicator,
                                                       destination_model=OpenLcaExchange)
print(f"Data in 'instance_of_openLca_exchange': {instance_of_openLca_exchange.model_dump()}")

```

```
Data in 'instance_of_openLca_exchange': {'flow': {'id': '87883a4e-1e3e-4c9d-90c0-f1bea36f8014'}, 'amount': 3.4}
```

### Re-use of mappings in a list

Once a mapping is defined, it will also be used if a field is a list containing the destination subschema:

```python
    class HestiaImpactAssessment(BaseModel):
        emissionsResourceUse: List[HestiaIndicator] = Field(None)

    class OpenLcaProcess(BaseModel):
        exchanges: List[OpenLcaExchange] = Field(None)

    converter_obj.register_model_map(source_model_type=HestiaImpactAssessment,
                                     destination_model_type=OpenLcaProcess,
                                     map_field_dict={
                                         "exchanges": "emissionsResourceUse",
                                     })

    instance_of_hestia_impact_assessment = HestiaImpactAssessment(**{
        "emissionsResourceUse": [
            {
                "type": "Indicator",
                "term": {
                    "type": "Term",
                    "id": "nh3ToAirInputsProduction"
                },
                "value": 3.4
            }
        ]
    })

    instance_of_openLca_process = converter_obj.transmute(source_model_obj=instance_of_hestia_impact_assessment,
                                                          destination_model=OpenLcaProcess)
    print(f"Data in 'instance_of_openLca_process': {instance_of_openLca_process.model_dump()}")
```
```
Data in 'instance_of_openLca_process': {'exchanges': [{'flow': {'id': '87883a4e-1e3e-4c9d-90c0-f1bea36f8014'}, 'amount': 3.4}]}
```

### Too generic schemas.

Some schemas allow storing information in ways that are so general they require different parsing / conversion policies depending on the situation.
To avoid having to build custom functions made up of long if/elif/else statements, you can add new pydantic models to the original pydantic implementation that helps you map each situation to a separate function:

```python

class HestiaIndicator(BaseModel):
    type: str = Field(default="Indicator")
    term: HestiaTerm
    value: float
    some_field_that_affects_how_this_schema_should_be_converted: bool = False


class SpecialCaseHestiaIndicator(HestiaIndicator):
    pass

    class Config:
        revalidate_instances = "subclass-instances"


normal_instance_of_openLca_exchange = converter_obj.transmute(source_model_obj=instance_of_hestia_indicator,
                                                              destination_model=OpenLcaExchange)
print(f"Data in 'normal_instance_of_openLca_exchange': {normal_instance_of_openLca_exchange.model_dump()}")

other_instance_of_hestia_indicator = HestiaIndicator(**{
    "type": "Indicator",
    "term": {
        "type": "Term",
        "id": "nh3ToAirInputsProduction"
    },
    "value": 3.4,
    "some_field_that_affects_how_this_schema_should_be_converted": True
})

if other_instance_of_hestia_indicator.some_field_that_affects_how_this_schema_should_be_converted == True:
    # This turns the HestiaIndicator instance into a SpecialCaseHestiaIndicator
    special_instance = SpecialCaseHestiaIndicator.model_validate(other_instance_of_hestia_indicator)

converter_obj.register_model_map(source_model_type=SpecialCaseHestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_field_dict={
                                     "flow": "term",
                                     "amount": _custom_function_values_in_scientific_notation
                                 })

# or using a custom function
converter_obj.register_model_map(source_model_type=SpecialCaseHestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_function=_custom_function_to_handle_special_case_hestia_indicators)

special_case_of_openLca_exchange = converter_obj.transmute(source_model_obj=special_instance,
                                                           destination_model=OpenLcaExchange)

print(f"Data in 'special_case_of_openLca_exchange': {special_case_of_openLca_exchange.model_dump()}")

```

The setting `revalidate_instances = "subclass-instances"` means that the line:
```python
special_instance = SpecialCaseHestiaIndicator.model_validate(other_instance_of_hestia_indicator)
```
will return a copy of `other_instance_of_hestia_indicator` but of Type `SpecialCaseHestiaIndicator` that inherits all the same fields as HestiaIndicator

### Many to one

TODO

### One to many

TODO

### Edge cases

If you need to edit multiple fields at once, or need to add data that is dependent on processes fields you can set a function to run at the end of a schema conversion using the `_always_run_` field:

```python
converter_obj.register_model_map(source_model_type=HestiaImpactAssessment,
                                 destination_model_type=OpenLcaProcess,
                                 map_field_dict={
                                     "exchanges": "emissionsResourceUse",
                                     "_always_run_": _convert_product_and_move_to_exchanges
                                 })
```

As an example, the `_convert_product_and_move_to_exchanges` runs after the conversion, and adds a new entry to the "exchanges" or the resulting OpenLcaProcess

```python

class HestiaImpactAssessment(BaseModel):
    emissionsResourceUse: List[HestiaIndicator] = Field(None)
    product: HestiaIndicator = Field(None)


def _convert_product_and_move_to_exchanges(model_data: dict,
                                           source_model: HestiaImpactAssessment = None,
                                           destination_model_type: OpenLcaExchange = None,
                                           context:dict = None) -> dict:
    """
    This function takes a Hestia "product" from an impact assessment, converts it to a open LCA exchange and places it in the list of exchanges in a openLCA Process.
    """
    product_exchange = converter_obj.transmute(source_model_obj=source_model.product,
                                               destination_model=OpenLcaExchange)

    model_data['exchanges'].append(product_exchange)

    return model_data
```
The `_convert_product_and_move_to_exchanges` is given a dict `model_data` containing the `destination_model` created so far, the source object `source_model`, the type of the destination model `destination_model_type` as well a `context` dict.
You can pass in values to the `context` dict using:
```python
instance_of_openLca_process = converter_obj.transmute(source_model_obj=HestiaImpactAssessment,
                                                      destination_model=OpenLcaProcess,
                                                      context={"Foo": "Bar"})
```

### Custom mapping implementations
Instead of defining a `map_field_dict`, it may be easier in some cases to implement your own function to handle the entire conversion between 2 schemas. This lets you build small custom code to deal with pairs of sub-schemas, while using the other converter features to handle the mode general tasks.

```python
converter_obj.register_model_map(source_model_type=HestiaUnit,
                                 destination_model_type=OpenLcaUnit,
                                 map_function=_convert_hestia_unit_to_openLCA)

```

```python
def _convert_hestia_unit_to_openLCA(source_model: HestiaUnit,
                                    destination_model_type=OpenLcaUnit,
                                    context:dict = None
                                    ) -> OpenLcaUnit:
    # ... your code here
    return OpenLcaUnit(name="kg", id="20aadc24-a391-41cf-b340-3e4529f44bde")
```

## Todo:
- Add support for "alias" fields
- Test all possible field annotations can be read Optional[List[Union[Unicorn,Magic, bool]]]
- subclass openlca_schema package
- ci/cd automate generate hestia pydantic schema from official repo
- hestia to openlca converter
- Add sub-git

# Sample Pydantic schemas in this repository:

## HESTIA pydantic schema

This repo contains a pydantic implementation of the hestia schema. It is autogenerated using [datamodel-codegen](https://docs.pydantic.dev/latest/integrations/datamodel_code_generator/) by reading the official schema definition files in https://gitlab.com/hestia-earth/hestia-schema/ . (Both Yaml and json-schema definitions). https://gitlab.com/hestia-earth/hestia-schema/ remains the only canonical source for the HESTIA schema. Minor changes added to build the POC hestia to openLCA converter.
### Todo:
- import validations from `hestia_earth.validation`
- add autogeneration script


## OpenLCA pydantic schema

This repo contains a pydantic implementation of the openLca schema. In the background it uses the official canonical openLca schema package `olca-schema` and reuses and subclasses the original classes when possible. Minor changes added such as making some fields more specific. For example:

`location` fields in olca classes now use the `Location` sub-schema
```python
    location: Optional[Location] = Field(None)
```
instead of the more generic `Ref` schema
```python
    location: Optional[Ref] = Field(None)
```
that was a parent of the `Location` class and too general.

# Sample converters.

## Hestia to OpenLCA sample converter

`src/Hestia_OpenLCA_Converter` contains a proof of concept schema converter that partially converts from the [HESTIA schema](https://gitlab.com/hestia-earth/hestia-schema/) to the [openLCA schema](http://greendelta.github.io/olca-schema)


