Source code for polsartools.sensors.biomass

import numpy as np
from osgeo import gdal
gdal.UseExceptions()
import os,tempfile,shutil
import xml.etree.ElementTree as ET
from netCDF4 import Dataset
from scipy.interpolate import LinearNDInterpolator
from polsartools.utils.utils import time_it
# from polsartools.utils.io_utils import write_T3, write_C3
from polsartools.preprocess.convert_S2 import convert_S


def resample_lut(filepath, newrows, newcols,bsc="sigma0"):
    nc_file = Dataset(filepath, mode="r")
    valid_options = ["sigma0", "gamma0"]
    if bsc =="sigma0": 
        lut = nc_file.groups["radiometry"].variables["sigmaNought"][:]
    elif bsc=="gamma0":
        lut = nc_file.groups["radiometry"].variables["gammaNought"][:]
    else:
        raise ValueError(f"Invalid bsc: '{bsc}'. Valid options are: {', '.join(valid_options)}")
    
    lut_array = np.array(lut)

    del lut
    nc_file.close()

    # Original shape
    rows0, cols0 = lut_array.shape

    # Build coordinate grid for original data
    yy, xx = np.mgrid[0:rows0, 0:cols0]
    coord = np.column_stack((xx.ravel(), yy.ravel()))
    values = lut_array.ravel()

    # Create interpolator
    interpfn = LinearNDInterpolator(coord, values)

    # New grid
    fullY, fullX = np.mgrid[0:newrows, 0:newcols]

    # Scale coordinates to original domain
    sigma_resampled = interpfn(fullX * (cols0 / newcols),
                               fullY * (rows0 / newrows))

    return sigma_resampled.astype(np.float32)

def write_rst(out_file, data,
              driver="GTiff", out_dtype=gdal.GDT_CFloat32,
              cog=False, ovr=[2,4,8,16], comp=False,
              geocode=False, ref_ds=None):

    # --- Choose driver and extension ---
    if driver == "ENVI":
        drv = gdal.GetDriverByName(driver)
        dataset = drv.Create(out_file, data.shape[1], data.shape[0], 1, out_dtype)
    else:
        drv = gdal.GetDriverByName("GTiff")
        options = ["BIGTIFF=IF_SAFER"]
        if comp:
            options += ["COMPRESS=LZW"]
        if cog:
            options += ["TILED=YES", "BLOCKXSIZE=512", "BLOCKYSIZE=512"]
        dataset = drv.Create(out_file, data.shape[1], data.shape[0], 1, out_dtype, options)

    # --- Write data ---
    band = dataset.GetRasterBand(1)
    band.WriteArray(data)
    band.SetNoDataValue(0)  # nodata set to 0

    dataset.FlushCache()

    # --- Build overviews if requested ---
    if cog and driver == "GTiff":
        dataset.BuildOverviews("NEAREST", ovr)

    # --- Copy georeferencing and metadata if ref_ds provided ---
    if ref_ds is not None:
        dataset.SetGeoTransform(ref_ds.GetGeoTransform())
        dataset.SetProjection(ref_ds.GetProjection())
        dataset.SetMetadata(ref_ds.GetMetadata())

        gcps = ref_ds.GetGCPs()
        if gcps and len(gcps) > 0:
            dataset.SetGCPs(gcps, ref_ds.GetGCPProjection())

    dataset = None  # close dataset

    # --- Optional geocoding step ---
    if geocode and ref_ds is not None:
        gcps = ref_ds.GetGCPs()
        if gcps and len(gcps) > 0:
            gdal.Warp(out_file, out_file,
                      dstSRS=ref_ds.GetGCPProjection(),
                      dstNodata=0)



def save_l1b(ref_ds, array, band_name, out_dir, common_metadata,
             geocode=False, driver="GTiff", out_dtype=gdal.GDT_Float32,
             cog=False, ovr=[2,4,8,16], comp=False):

    # --- Choose extension based on driver ---
    if driver == "ENVI":
        ext = ".bin"
    else:
        ext = ".tif"

    out_path = os.path.join(out_dir, f"{band_name}{ext}")

    # --- Create dataset with options ---
    if driver == "ENVI":
        drv = gdal.GetDriverByName(driver)
        out_ds = drv.Create(out_path,
                            ref_ds.RasterXSize,
                            ref_ds.RasterYSize,
                            1,
                            out_dtype)
    else:
        drv = gdal.GetDriverByName("GTiff")
        options = ["BIGTIFF=IF_SAFER"]
        if comp:
            options += ["COMPRESS=LZW"]
        if cog:
            options += ["TILED=YES", "BLOCKXSIZE=512", "BLOCKYSIZE=512"]
        out_ds = drv.Create(out_path,
                            ref_ds.RasterXSize,
                            ref_ds.RasterYSize,
                            1,
                            out_dtype,
                            options)

    # --- Copy georeferencing ---
    out_ds.SetGeoTransform(ref_ds.GetGeoTransform())
    out_ds.SetProjection(ref_ds.GetProjection())

    # --- Copy GCPs if present ---
    gcps = ref_ds.GetGCPs()
    if gcps and len(gcps) > 0:
        out_ds.SetGCPs(gcps, ref_ds.GetGCPProjection())

    # --- Add dataset-level metadata ---
    out_ds.SetMetadata(common_metadata)

    # --- Add band-specific metadata ---
    band = out_ds.GetRasterBand(1)
    band.WriteArray(array)
    band.SetDescription(band_name)
    band.SetMetadata({"POLARIMETRIC_INTERP": band_name})

    # --- Set nodata to 0 ---
    band.SetNoDataValue(0)

    out_ds.FlushCache()

    # --- Build overviews if requested ---
    if cog and driver == "GTiff":
        out_ds.BuildOverviews("NEAREST", ovr)

    out_ds = None  # close dataset

    # --- Optional geocoding step ---
    if geocode and gcps and len(gcps) > 0:
        gdal.Warp(out_path, out_path,
                  dstSRS=ref_ds.GetGCPProjection(),
                  dstNodata=0)




[docs] @time_it def import_biomass_l1a(in_dir,mat='T3', azlks=8,rglks=2,fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False, bsc='sigma0', out_dir = None, recip=False, geocode=False ): """ Extract polarimetric S2/T4/C4/T3/C3 matrix data from BIOMASS L1A SLC data. Example Usage: -------------- To process imagery and generate a T3 matrix: .. code-block:: python import_biomass_l1a("/path/to/data", mat="T3") To process imagery and generate a C3 matrix: .. code-block:: python import_biomass_l1a("/path/to/data", mat="C3", azlks=10, rglks=3) Parameters: ----------- in_dir : str Path to the folder containing the BIOMASS L1A files. mat : str, optional (default='T3') Type of matrix to extract. Valid options: 'S2', 'C4, 'C3', 'T4', 'T3', 'C2HX', 'C2VX', 'C2HV','T2HV' azlks : int, optional (default=8) The number of azimuth looks to apply during the C/T processing. rglks : int, optional (default=2) The number of range looks to apply during the C/Tprocessing. fmt : {'tif', 'bin'}, optional (default='tif') Output format: - 'tif': GeoTIFF - 'bin': Raw binary format cog : bool, optional (default=False) If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews. ovr : list of int, optional (default=[2, 4, 8, 16]) Overview levels for COG generation. Ignored if cog=False. comp : bool, optional (default=False) If True, applies LZW compression to GeoTIFF outputs. bsc : str, optional (default='sigma0') The type of radar cross-section to use for scaling. Available options: - 'sigma0' : Uses `sigmaNought` LUT for scaling. - 'gamma0' : Uses `gammaNought` LUT for scaling. out_dir : str or None, optional (default=None) Directory to save output files. If None, a folder named after the matrix type will be created in the same location as the input file. recip : bool, optional (default=False) If True, scattering matrix reciprocal symmetry is applied, i.e, S_HV = S_VH. """ cf_dB = 39.68531599998061 calfac_linear = np.sqrt(10 ** ((cf_dB) / 10)) valid_full_pol = ['S2', 'C4', 'C3', 'T4', 'T3', 'C2HX', 'C2VX', 'C2HV', 'T2HV'] # valid_dual_pol = ['Sxy', 'C2', 'T2'] valid_matrices = valid_full_pol if mat not in valid_matrices: raise ValueError(f"Invalid matrix type '{mat}'. \n Supported types are:\n" f" Full-pol: {sorted(valid_full_pol)}\n") temp_dir = None ext = 'bin' if fmt == 'bin' else 'tif' driver = 'ENVI' if fmt == 'bin' else "GTiff" # Final output directory if out_dir is None: final_out_dir = os.path.join(in_dir, mat) else: final_out_dir = os.path.join(out_dir, mat) os.makedirs(final_out_dir, exist_ok=True) # Intermediate output directory if mat in ['S2', 'Sxy']: base_out_dir = final_out_dir else: temp_dir = tempfile.mkdtemp(prefix='temp_S2_') base_out_dir = temp_dir mat_tag = 'S2' biomassPath = in_dir.lower() biomassPath = biomassPath[-80:] biomassPath = biomassPath[:70] biomassDataAbsPath = os.path.join(in_dir,"measurement",biomassPath + "_i_abs.tiff") biomassDataPhasePath = os.path.join(in_dir,"measurement",biomassPath + "_i_phase.tiff") lut_path = os.path.join(in_dir,"annotation",biomassPath + "_lut.nc") # try: dataAbsSet = gdal.Open(biomassDataAbsPath, gdal.GA_ReadOnly) dataPhaseSet = gdal.Open(biomassDataPhasePath, gdal.GA_ReadOnly) if dataAbsSet is None or dataPhaseSet is None: raise ValueError("Could not open input GeoTIFFs.") bands = dataAbsSet.RasterCount print("Extracting single-look elements...") s12_data = None s21_data = None firstAbs = dataAbsSet.GetRasterBand(1).ReadAsArray() rows, cols = firstAbs.shape lut_g0 = resample_lut(lut_path, rows, cols, bsc) del firstAbs for idx in range(1, bands + 1): bandAbs = dataAbsSet.GetRasterBand(idx).ReadAsArray() bandPhase = dataPhaseSet.GetRasterBand(idx).ReadAsArray() bandReal = bandAbs * np.cos(bandPhase) #* calfac_linear bandImag = bandAbs * np.sin(bandPhase) #* calfac_linear # lut_g0 = resample_lut(lut_path, bandReal.shape[0], bandReal.shape[1],bsc) bandComplex = (bandReal + 1j*bandImag)*lut_g0 del bandReal, bandImag if driver == "GTiff": ext = ".tif" else: ext = ".bin" if idx == 1: outFile = os.path.join(base_out_dir, f"s11{ext}") write_rst(outFile, bandComplex, driver=driver, out_dtype=gdal.GDT_CFloat32, cog=cog, ovr=ovr, comp=comp,geocode=geocode, ref_ds=dataAbsSet) elif idx == 2: if recip: s12_data = bandComplex # hold temporarily else: outFile = os.path.join(base_out_dir, f"s12{ext}") write_rst(outFile, bandComplex, driver=driver, out_dtype=gdal.GDT_CFloat32, cog=cog, ovr=ovr, comp=comp,geocode=geocode, ref_ds=dataAbsSet) elif idx == 3: if recip: s21_data = bandComplex # hold temporarily else: outFile = os.path.join(base_out_dir, f"s21{ext}") write_rst(outFile, bandComplex, driver=driver, out_dtype=gdal.GDT_CFloat32, cog=cog, ovr=ovr, comp=comp,geocode=geocode, ref_ds=dataAbsSet) elif idx == 4: outFile = os.path.join(base_out_dir, f"s22{ext}") write_rst(outFile, bandComplex, driver=driver, out_dtype=gdal.GDT_CFloat32, cog=cog, ovr=ovr, comp=comp,geocode=geocode, ref_ds=dataAbsSet) del lut_g0 # Handle reciprocity only once, after both s12 and s21 are read if recip and s12_data is not None and s21_data is not None: avg_s12_s21 = (s12_data + s21_data) / 2.0 for key in ["s12", "s21"]: outFile = os.path.join(base_out_dir, f"{key}{ext}") write_rst(outFile, avg_s12_s21, driver=driver, out_dtype=gdal.GDT_CFloat32, cog=cog, ovr=ovr, comp=comp,geocode=geocode, ref_ds=dataAbsSet) # except Exception as e: # print(f"An error occurred: {e}") # Matrix conversion if needed if mat not in ['S2', 'Sxy']: print("Extracting multi-look elements...") convert_S(base_out_dir, mat=mat, azlks=azlks, rglks=rglks, cf=1, fmt=fmt, out_dir=final_out_dir, cog=cog, ovr=ovr, comp=comp) # Clean up temp directory if temp_dir: try: shutil.rmtree(temp_dir) except Exception as e: print(f"Warning: Could not delete temporary directory {temp_dir}: {e}")
[docs] @time_it def import_biomass_l1b(in_dir, # azlks=1,rglks=1, fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False, bsc='sigma0', out_dir = None, geocode=True, # recip=False, ): """ Extracts the backscatter intensity elements from a BIOMASS L2B '_DGM__' folder and saves them into respective tif/binar files. Example Usage: -------------- The following code will extract the intensity elements from a BIOMASS L2B '_DGM__' folder .. code-block:: python import_biomass_l1b("/path/to/data") Parameters: ----------- in_dir : str Path to the folder containing the BIOMASS L1B files. fmt : {'tif', 'bin'}, optional (default='tif') Output format: - 'tif': GeoTIFF - 'bin': Raw binary format cog : bool, optional (default=False) If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews. ovr : list of int, optional (default=[2, 4, 8, 16]) Overview levels for COG generation. Ignored if cog=False. comp : bool, optional (default=False) If True, applies LZW compression to GeoTIFF outputs. bsc : str, optional (default='sigma0') The type of radar cross-section to use for scaling. Available options: - 'sigma0' : Uses `sigmaNought` LUT for scaling. - 'gamma0' : Uses `gammaNought` LUT for scaling. out_dir : str or None, optional (default=None) Directory to save output files. If None, a folder named after the matrix type will be created in the same location as the input file. geocode : bool, default=True If True, performs a coarse geocoding using GCPs in the file. """ mat='I4' cf_dB = 39.68531599998061 calfac_linear = np.sqrt(10 ** ((cf_dB) / 10)) temp_dir = None ext = 'bin' if fmt == 'bin' else 'tif' driver = 'ENVI' if fmt == 'bin' else "GTiff" # Final output directory if out_dir is None: final_out_dir = os.path.join(in_dir, mat) else: final_out_dir = os.path.join(out_dir, mat) os.makedirs(final_out_dir, exist_ok=True) # Intermediate output directory if mat in ['I4']: base_out_dir = final_out_dir else: temp_dir = tempfile.mkdtemp(prefix='temp_S2_') base_out_dir = temp_dir biomassPath = in_dir.lower() biomassPath = biomassPath[-80:] biomassPath = biomassPath[:70] biomassDataAbsPath = os.path.join(in_dir, "measurement", biomassPath + "_i_abs.tiff") lut_path = os.path.join(in_dir,"annotation",biomassPath + "_lut.nc") # Open dataset ds = gdal.Open(biomassDataAbsPath, gdal.GA_ReadOnly) # Extract dataset-level metadata common_metadata = ds.GetMetadata() if ds is None: raise("Could not open:", biomassDataAbsPath) # bsc = "sigma0" HH_shape = (ds.RasterYSize, ds.RasterXSize) lut_g0 = resample_lut(lut_path, HH_shape[0], HH_shape[1], bsc) HH = ds.GetRasterBand(1).ReadAsArray().astype("float32") HH = HH * lut_g0 save_l1b(ds, HH, "HH", base_out_dir, common_metadata, driver=driver,cog=cog,ovr = ovr,comp=comp, geocode=geocode) del HH HV = ds.GetRasterBand(2).ReadAsArray().astype("float32") HV = HV * lut_g0 save_l1b(ds, HV, "HV", base_out_dir, common_metadata, driver=driver,cog=cog,ovr = ovr,comp=comp, geocode=geocode) del HV VH = ds.GetRasterBand(3).ReadAsArray().astype("float32") VH = VH * lut_g0 save_l1b(ds, VH, "VH", base_out_dir, common_metadata, driver=driver,cog=cog,ovr = ovr,comp=comp, geocode=geocode) del VH VV = ds.GetRasterBand(4).ReadAsArray().astype("float32") VV = VV * lut_g0 save_l1b(ds, VV, "VV", base_out_dir, common_metadata, driver=driver,cog=cog,ovr = ovr,comp=comp, geocode=geocode) del VV