Source code for polsartools.sensors.isro_asar

import numpy as np
from osgeo import gdal
gdal.UseExceptions()
import os,glob,tables
import xml.etree.ElementTree as ET
from polsartools.utils.utils import time_it
# from polsartools.utils.io_utils import write_T3, write_C3,write_C4,write_s2_bin
from polsartools.utils.geo_utils import geocode_grid, intp_grid, update_vrt, write_latlon
from polsartools.utils.proc_utils import process_chunks_parallel
from polsartools.utils.utils import conv2d,time_it

from polsartools.utils.h5_utils import h5_polsar, get_ml_chunk
from netCDF4 import Dataset
from polsartools.sensors.nisar import nisar_fp,nisar_dp

def load_asar_meta(filepath):
    data_dict = {}

    with open(filepath, 'r') as file:
        for line in file:
            line = line.strip()
            if line:  # Ensure line is not empty
                key, value = line.split("=", 1)  # Split only on first "="
                data_dict[key.strip()] = value.strip()
    return data_dict

@time_it
def convert_l1_tif_dB(infolder,  backscatter=1, complex_flag=0, fmt="tif", 
                   cog_flag=False, cog_overviews = [2, 4, 8, 16], 
                   write_flag=True, max_workers=None):
    
    l_band_files = list(sorted(glob.glob(os.path.join(infolder, "ASAR_L_JOINT_*LEVEL1_*.tif"))))
    s_band_files = list(sorted(glob.glob(os.path.join(infolder, "ASAR_S_JOINT_*LEVEL1_*.tif"))))
    inc_file = glob.glob(os.path.join(infolder, "ASAR_*LEVEL1_INC_MAP*.tif"))
    window_size=3
    # print(len(l_band_files))
    # print(len(s_band_files))
    # print(len(inc_file))
    
    nb_file = glob.glob(os.path.join(infolder, "ASAR_*LEVEL1_BAND_META*.txt"))
    
    if len(nb_file) >0:
        print("Found meta data file applying noise bias.")
        meta_dict = load_asar_meta(nb_file[0])
        l_hh_nb = meta_dict['Image_Noise_Bias_L_HH']
        l_hv_nb = meta_dict['Image_Noise_Bias_L_HV']
        l_vh_nb = meta_dict['Image_Noise_Bias_L_VH']
        l_vv_nb = meta_dict['Image_Noise_Bias_L_VV']

        s_hh_nb = meta_dict['Image_Noise_Bias_S_HH']
        s_hv_nb = meta_dict['Image_Noise_Bias_S_HV']
        s_vh_nb = meta_dict['Image_Noise_Bias_S_VH']
        s_vv_nb = meta_dict['Image_Noise_Bias_S_VV']
    else:
        print("No meta data file found. Applying noise bias of 0.")
        l_hh_nb = 0
        l_hv_nb = 0
        l_vh_nb = 0
        l_vv_nb = 0

        s_hh_nb = 0
        s_hv_nb = 0
        s_vh_nb = 0
        s_vv_nb = 0

    if backscatter==2:
        out_fix = 'g0'
    elif backscatter==3:
        out_fix = 'b0'
    else:
        out_fix = 's0' 
    
    def output_names_dB(band_files, prefix):
        out_paths = []
        for file in band_files:
            pol = file.split("_LEVEL1_")[1].split(".tif")[0]  # Extract polarization (e.g., HH, HV, VH, VV)
            out_paths.append(os.path.join(infolder, f"{prefix}_{pol}_{out_fix}_dB.{fmt}"))
        return out_paths
    def output_names_complex(band_files, prefix):
        out_paths = []
        for file in band_files:
            pol = file.split("_LEVEL1_")[1].split(".tif")[0]  # Extract polarization (e.g., HH, HV, VH, VV)           
            real_path = os.path.join(infolder, f"{prefix}_{pol}_{out_fix}_real.{fmt}")
            imag_path = os.path.join(infolder, f"{prefix}_{pol}_{out_fix}_imag.{fmt}")
        
            out_paths.extend([real_path, imag_path]) 
        return out_paths
    
    output_l_filepaths=[]
    output_s_filepaths=[]
    
    if l_band_files:
        if complex_flag:
            output_l_filepaths.extend(output_names_complex(l_band_files, "L"))
        else:
            output_l_filepaths.extend(output_names_dB(l_band_files, "L"))

    if s_band_files:
        if complex_flag:
            output_s_filepaths.extend(output_names_complex(s_band_files, "S"))
        else:
            output_s_filepaths.extend(output_names_dB(s_band_files, "S"))
    

    def copy_gcps_to_outputs(input_filepaths, output_filepaths):
        """ Copies GCPs from one input file to all output files. """
        input_filepath = input_filepaths[0]
        # Open the input file
        input_dataset = gdal.Open(input_filepath, gdal.GA_ReadOnly)
        if input_dataset is None:
            raise FileNotFoundError(f"Cannot open {input_filepath}")

        # Extract GCPs
        gcps = input_dataset.GetGCPs()
        gcp_projection = input_dataset.GetGCPProjection()

        if not gcps:
            # print("No GCPs found in the input file.")
            return

        # Apply GCPs to each output file
        for output_filepath in output_filepaths:
            output_dataset = gdal.Open(output_filepath, gdal.GA_Update)
            if output_dataset is None:
                raise FileNotFoundError(f"Cannot open {output_filepath}")

            output_dataset.SetGCPs(gcps, gcp_projection)
            output_dataset.GetRasterBand(1).SetNoDataValue(np.nan)
            output_dataset.FlushCache()  
            output_dataset = None 

        # Close the input file
        input_dataset = None

    if len(l_band_files)==4:
        print("Processing L-band data...")
        input_filepaths = l_band_files + inc_file
        # print(input_filepaths)
        process_chunks_parallel(input_filepaths, output_l_filepaths, window_size, 
                                write_flag, process_chunk_gtif,
                                *[complex_flag, backscatter, l_hh_nb, l_hv_nb, l_vh_nb, l_vv_nb],
                                bands_to_read=[2,2,2,2,1] , block_size=(512, 512), 
                                max_workers=max_workers, num_outputs=len(output_l_filepaths),
                                cog_flag=cog_flag, cog_overviews=cog_overviews,
                                post_proc=copy_gcps_to_outputs
                                )

    if len(s_band_files)==4:
        print("Processing S-band data...")
        input_filepaths = s_band_files + inc_file
        # print(input_filepaths)
        process_chunks_parallel(input_filepaths, output_s_filepaths, window_size, 
                                write_flag, process_chunk_gtif,
                                *[complex_flag, backscatter, s_hh_nb, s_hv_nb, s_vh_nb, s_vv_nb],
                                bands_to_read=[2,2,2,2,1] , block_size=(512, 512), 
                                max_workers=max_workers, num_outputs=len(output_s_filepaths),
                                cog_flag=cog_flag, cog_overviews=cog_overviews,
                                post_proc=copy_gcps_to_outputs
                                )
    
    

def process_chunk_gtif(chunks, window_size, *args):
    # kernel = np.ones((window_size,window_size),np.float32)/(window_size*window_size)
    complex_flag=int(args[-6])
    backscatter=int(args[-5])
    hh_nb=float(args[-4])
    hv_nb=float(args[-3])
    vh_nb=float(args[-2])
    vv_nb=float(args[-1])
    
    cc_dB = 42.0
    cc_linear = np.sqrt(10**(cc_dB/10.0))

    if backscatter==2 and complex_flag==1:

        hh = np.array(chunks[0])+1j*np.array(chunks[1])*np.tan(np.deg2rad(np.array(chunks[8])))/cc_linear
        hv = np.array(chunks[2])+1j*np.array(chunks[3])*np.tan(np.deg2rad(np.array(chunks[8])))/cc_linear
        vh = np.array(chunks[4])+1j*np.array(chunks[5])*np.tan(np.deg2rad(np.array(chunks[8])))/cc_linear
        vv = np.array(chunks[6])+1j*np.array(chunks[7])*np.tan(np.deg2rad(np.array(chunks[8])))/cc_linear
    elif backscatter==2 and complex_flag==0:
        hh = 10*np.log10(np.abs(np.array(chunks[0])+1j*np.array(chunks[1]))**2-hh_nb)+10*np.log10(np.tan(np.deg2rad(np.array(chunks[8]))))-cc_dB
        hv = 10*np.log10(np.abs(np.array(chunks[2])+1j*np.array(chunks[3]))**2-hv_nb)+10*np.log10(np.tan(np.deg2rad(np.array(chunks[8]))))-cc_dB
        vh = 10*np.log10(np.abs(np.array(chunks[4])+1j*np.array(chunks[5]))**2-vh_nb)+10*np.log10(np.tan(np.deg2rad(np.array(chunks[8]))))-cc_dB
        vv = 10*np.log10(np.abs(np.array(chunks[6])+1j*np.array(chunks[7]))**2-vv_nb)+10*np.log10(np.tan(np.deg2rad(np.array(chunks[8]))))-cc_dB
        
    elif backscatter==3 and complex_flag==0:
        hh = 10*np.log10(np.abs(np.array(chunks[0])+1j*np.array(chunks[1]))**2-hh_nb)-cc_dB
        hv = 10*np.log10(np.abs(np.array(chunks[2])+1j*np.array(chunks[3]))**2-hv_nb)-cc_dB
        vh = 10*np.log10(np.abs(np.array(chunks[4])+1j*np.array(chunks[5]))**2-vh_nb)-cc_dB
        vv = 10*np.log10(np.abs(np.array(chunks[6])+1j*np.array(chunks[7]))**2-vv_nb)-cc_dB
    elif backscatter==3 and complex_flag==1:
        hh = np.array(chunks[0])+1j*np.array(chunks[1])/cc_linear
        hv = np.array(chunks[2])+1j*np.array(chunks[3])/cc_linear
        vh = np.array(chunks[4])+1j*np.array(chunks[5])/cc_linear
        vv = np.array(chunks[6])+1j*np.array(chunks[7])/cc_linear
    elif complex_flag==0:
        hh = 10*np.log10(np.abs(np.array(chunks[0])+1j*np.array(chunks[1]))**2-hh_nb)+10*np.log10(np.sin(np.deg2rad(np.array(chunks[8]))))-cc_dB
        hv = 10*np.log10(np.abs(np.array(chunks[2])+1j*np.array(chunks[3]))**2-hv_nb)+10*np.log10(np.sin(np.deg2rad(np.array(chunks[8]))))-cc_dB
        vh = 10*np.log10(np.abs(np.array(chunks[4])+1j*np.array(chunks[5]))**2-vh_nb)+10*np.log10(np.sin(np.deg2rad(np.array(chunks[8]))))-cc_dB
        vv = 10*np.log10(np.abs(np.array(chunks[6])+1j*np.array(chunks[7]))**2-vv_nb)+10*np.log10(np.sin(np.deg2rad(np.array(chunks[8]))))-cc_dB
    else:
        hh = np.array(chunks[0])+1j*np.array(chunks[1])*np.sin(np.deg2rad(np.array(chunks[8])))/cc_linear
        hv = np.array(chunks[2])+1j*np.array(chunks[3])*np.sin(np.deg2rad(np.array(chunks[8])))/cc_linear
        vh = np.array(chunks[4])+1j*np.array(chunks[5])*np.sin(np.deg2rad(np.array(chunks[8])))/cc_linear
        vv = np.array(chunks[6])+1j*np.array(chunks[7])*np.sin(np.deg2rad(np.array(chunks[8])))/cc_linear

    hh[hh==0]=np.nan
    hh[hh==np.inf]=np.nan
    hh[hh==-np.inf]=np.nan
    
    hv[hv==0]=np.nan
    hv[hv==np.inf]=np.nan
    hv[hv==-np.inf]=np.nan
    
    vh[vh==0]=np.nan
    vh[vh==np.inf]=np.nan
    vh[vh==-np.inf]=np.nan
    
    vv[vv==0]=np.nan
    vv[vv==np.inf]=np.nan
    vv[vv==-np.inf]=np.nan
    
    if complex_flag:
        return np.real(hh),np.imag(hh),np.real(hv),np.imag(hv),np.real(vh),np.imag(vh),np.real(vv),np.imag(vv)
    else:
        return hh,hv,vh,vv

def get_rslc_path(h5, freq_band):
    for product_type in ['RSLC', 'SLC']:
        base_path = f'/science/{freq_band}SAR/{product_type}/swaths/frequencyA'
        if h5.__contains__(base_path):
            return base_path
        else:
            print(f"Base path not found: {base_path}")
    return None

def rslc_meta(inFile):
    band_table = [
        ('/science/LSAR', 'L'),
        ('/science/SSAR', 'S')
    ]

    # Step 1: Identify frequency band and root path
    try:
        with tables.open_file(inFile, mode="r") as h5:
            for path, band in band_table:
                if path in h5:
                    freq_band = band
                    freq_path = path
                    break
            else:
                print("Neither LSAR nor SSAR data found in the file.")
                return None

            # Step 2: Read polarization list
            listOfPolarizations = None
            for base in ['RSLC', 'SLC']:
                pol_path = f'{freq_path}/{base}/swaths/frequencyA/listOfPolarizations'
                try:
                    listOfPolarizations = np.array(h5.get_node(pol_path).read()).astype(str)
                except tables.NoSuchNodeError:
                    # print(f"Polarization node missing: {pol_path}")
                    continue
                
            # pol_path = f'{freq_path}/RSLC/swaths/frequencyA/listOfPolarizations'
            # try:
            #     listOfPolarizations = np.array(h5.get_node(pol_path).read()).astype(str)
            # except tables.NoSuchNodeError:
            #     print(f"Polarization node missing: {pol_path}")
            #     listOfPolarizations = None

    except Exception:
        raise RuntimeError("Invalid .h5 file !!")

    return freq_band,listOfPolarizations

[docs] @time_it def import_isro_asar( inFile, mat='T3', azlks=5,rglks=5, fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False, out_dir=None,recip=False, max_workers=None, calibration_constant = 42 ): """ Extracts PolSAR matrix elements from a ISRO ASAR RSLC HDF5 file and saves them as slant range raster files. This function supports both dual-polarimetric and full-polarimetric RSLC data. It performs multi-looking using specified azimuth and range looks, and outputs matrix elements in either GeoTIFF or binary format. Optional support for Cloud Optimized GeoTIFFs (COGs), and TIFF compression. Examples -------- >>> import_isro_asar("path_to_file.h5", azlks=30, rglks=15) Extracts matrix elements with 5x5 multi-looking and saves them in the default output folder. >>> import_isro_asar("path_to_file.h5", mat='T3', fmt='tif', cog=True, comp=True) Extracts T3 matrix elements and saves them as compressed Cloud Optimized GeoTIFFs. Parameters ---------- inFile : str Path to the NISAR RSLC HDF5 file containing dual-pol or full-pol SAR data. mat : str, optional (default='T3') Type of matrix to extract. Valid options for Full-pol: 'S2', 'C4, 'C3', 'T4', 'T3', 'C2HX', 'C2VX', 'C2HV','T2HV'and Dual-pol: 'Sxy','C2'. azlks : int, optional (default=5) Number of azimuth looks for multi-looking. rglks : int, optional (default=5) Number of range looks for multi-looking. fmt : {'tif', 'bin'}, optional (default='tif') Output format: - 'tif': GeoTIFF in slant range - 'bin': Raw binary format cog : bool, optional (default=False) If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews. ovr : list of int, optional (default=[2, 4, 8, 16]) Overview levels for COG generation. Ignored if cog=False. comp : bool, optional (default=False) If True, applies LZW compression to GeoTIFF outputs. out_dir : str or None, optional (default=None) Directory to save output files. If None, a folder named after the matrix type will be created in the same location as the input file. recip : bool, optional (default=False) If True, scattering matrix reciprocal symmetry is applied, i.e, S_HV = S_VH. """ geocode_flag=False cc_linear = np.sqrt(10**(calibration_constant/10)) freq_band,listOfPolarizations = rslc_meta(inFile) nchannels = len(listOfPolarizations) print(f"Detected {freq_band}-band {listOfPolarizations} ") inFolder = os.path.dirname(inFile) if not inFolder: inFolder = "." with tables.open_file(inFile, mode="r") as h5: base_path = get_rslc_path(h5, freq_band) h5.close() start_x = 0 start_y = 0 xres = 1 yres = 1 projection = 4326 if nchannels==2: nisar_dp(mat,inFile, inFolder, base_path, azlks, rglks, recip, max_workers, start_x, start_y, xres, yres, projection, fmt, cog, ovr, comp, None, None, listOfPolarizations, out_dir,cc_linear) elif nchannels==4: nisar_fp(mat, inFile, inFolder, base_path, azlks, rglks, recip, max_workers, start_x, start_y, xres, yres, projection, fmt, cog, ovr, comp, None, None, out_dir,cc_linear)