import os
import numpy as np
from polsartools.utils.proc_utils import process_chunks_parallel
from polsartools.utils.utils import conv2d,time_it
from polsartools.utils.convert_matrices import T3_C3_mat, C3_T3_mat
from .fp_infiles import fp_c3t3files
[docs]
@time_it
def yam4cfp(infolder, model="", window_size=1, outType="tif", cog_flag=False,
cog_overviews = [2, 4, 8, 16], write_flag=True,
max_workers=None,block_size=(512, 512),
progress_callback=None, # for QGIS plugin
):
"""Perform Yamaguchi 4-Component Decomposition for full-pol SAR data.
This function implements the Yamaguchi 4-component decomposition with three
different model options: original (Y4O), rotation-corrected (Y4R), and
extended volume scattering model (Y4S). The decomposition separates the total
power into surface, double-bounce, volume, and helix scattering components.
Examples
--------
>>> # Original Yamaguchi decomposition
>>> yam4cfp("/path/to/fullpol_data")
>>> # Rotation-corrected decomposition
>>> yam4cfp(
... infolder="/path/to/fullpol_data",
... model="y4cr",
... window_size=5,
... outType="tif",
... cog_flag=True
... )
>>> # Extended volume model decomposition
>>> yam4cfp(
... infolder="/path/to/fullpol_data",
... model="y4cs",
... window_size=5
... )
Parameters
----------
infolder : str
Path to the input folder containing full-pol T3 or C3 matrix files.
model : {'', 'y4cr', 'y4cs'}, default=''
Decomposition model to use:
- '': Original Yamaguchi 4-component (Y4O)
- 'y4cr': Rotation-corrected Yamaguchi (Y4R)
- 'y4cs': Extended volume scattering model (Y4S)
window_size : int, default=1
Size of the spatial averaging window. Larger windows reduce speckle noise
but decrease spatial resolution.
outType : {'tif', 'bin'}, default='tif'
Output file format:
- 'tif': GeoTIFF format with georeferencing information
- 'bin': Raw binary format
cog_flag : bool, default=False
If True, creates Cloud Optimized GeoTIFF (COG) outputs with internal tiling
and overviews for efficient web access.
cog_overviews : list[int], default=[2, 4, 8, 16]
Overview levels for COG creation. Each number represents the
decimation factor for that overview level.
write_flag : bool, default=True
If True, writes results to disk. If False, only processes data in memory.
max_workers : int | None, default=None
Maximum number of parallel processing workers. If None, uses
CPU count - 1 workers.
block_size : tuple[int, int], default=(512, 512)
Size of processing blocks (rows, cols) for parallel computation.
Larger blocks use more memory but may be more efficient.
Returns
-------
None
Writes four output files to disk for the selected model:
1. {prefix}_odd: Surface scattering power
2. {prefix}_dbl: Double-bounce scattering power
3. {prefix}_vol: Volume scattering power
4. {prefix}_hlx: Helix scattering power
where prefix is 'Yam4co', 'Yam4cr', or 'Yam4csr' based on model choice.
"""
input_filepaths = fp_c3t3files(infolder)
output_filepaths = []
if outType == "bin":
if not model or model=="y4co":
output_filepaths.append(os.path.join(infolder, "Yam4co_odd.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4co_dbl.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4co_vol.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4co_hlx.bin"))
elif model=="y4cr":
output_filepaths.append(os.path.join(infolder, "Yam4cr_odd.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4cr_dbl.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4cr_vol.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4cr_hlx.bin"))
elif model=="y4cs":
output_filepaths.append(os.path.join(infolder, "Yam4csr_odd.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4csr_dbl.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4csr_vol.bin"))
output_filepaths.append(os.path.join(infolder, "Yam4csr_hlx.bin"))
else:
raise(f"Invalid model!! \n model type argument must be either y4co for default or y4cr or y4cs")
else:
if not model or model=="y4co":
output_filepaths.append(os.path.join(infolder, "Yam4co_odd.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4co_dbl.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4co_vol.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4co_hlx.tif"))
elif model=="y4cr":
output_filepaths.append(os.path.join(infolder, "Yam4cr_odd.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4cr_dbl.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4cr_vol.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4cr_hlx.tif"))
elif model=="y4cs":
output_filepaths.append(os.path.join(infolder, "Yam4csr_odd.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4csr_dbl.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4csr_vol.tif"))
output_filepaths.append(os.path.join(infolder, "Yam4csr_hlx.tif"))
else:
raise(f"Invalid model!! \n model type argument must be either '' for default or Y4R or S4R")
process_chunks_parallel(input_filepaths, list(output_filepaths),
window_size, write_flag,
process_chunk_yam4cfp,
*[model],
block_size=block_size,
max_workers=max_workers, num_outputs=len(output_filepaths),
cog_flag=cog_flag,
cog_overviews=cog_overviews,
progress_callback=progress_callback
)
def unitary_rotation(T3, teta):
# Direct access to each slice, no transposition or extra memory allocation
T11 = T3[0]
T12 = T3[1]
T13 = T3[2]
T22 = T3[4]
T23 = T3[5]
T33 = T3[8]
# Compute the real and imaginary parts of T12, T13, and T23
T12_re, T12_im = T12.real, T12.imag
T13_re, T13_im = T13.real, T13.imag
T23_re, T23_im = T23.real, T23.imag
# Apply the unitary rotation directly to the 3D array (in place)
T3[1] = T12_re * np.cos(teta) + T13_re * np.sin(teta) + 1j * (T12_im * np.cos(teta) + T13_im * np.sin(teta))
T3[2] = -T12_re * np.sin(teta) + T13_re * np.cos(teta) + 1j * (-T12_im * np.sin(teta) + T13_im * np.cos(teta))
T3[4] = T22 * np.cos(teta)**2 + 2 * T23_re * np.cos(teta) * np.sin(teta) + T33 * np.sin(teta)**2
T3[5] = -T22 * np.cos(teta) * np.sin(teta) + T23_re * np.cos(teta)**2 - T23_re * np.sin(teta)**2 + T33 * np.cos(teta) * np.sin(teta) + 1j * (T23_im * np.cos(teta)**2 + T23_im * np.sin(teta)**2)
T3[8] = T22 * np.sin(teta)**2 + T33 * np.cos(teta)**2 - 2 * T23_re * np.cos(teta) * np.sin(teta)
return T3
# def process_chunk_yam4cfp(chunks, window_size, input_filepaths, model,*args):
def process_chunk_yam4cfp(chunks, window_size, input_filepaths, *args, **kwargs):
model = args[-1]
# additional_arg1 = args[0] if len(args) > 0 else None
# additional_arg2 = args[1] if len(args) > 1 else None
if 'T11' in input_filepaths[0] and 'T22' in input_filepaths[5] and 'T33' in input_filepaths[8]:
t11_T1 = np.array(chunks[0])
t12_T1 = np.array(chunks[1])+1j*np.array(chunks[2])
t13_T1 = np.array(chunks[3])+1j*np.array(chunks[4])
t21_T1 = np.conj(t12_T1)
t22_T1 = np.array(chunks[5])
t23_T1 = np.array(chunks[6])+1j*np.array(chunks[7])
t31_T1 = np.conj(t13_T1)
t32_T1 = np.conj(t23_T1)
t33_T1 = np.array(chunks[8])
T_T1 = np.array([[t11_T1, t12_T1, t13_T1],
[t21_T1, t22_T1, t23_T1],
[t31_T1, t32_T1, t33_T1]])
C3 = T3_C3_mat(T_T1)
span = C3[0,0].real+C3[1,1].real+C3[2,2].real
del C3
if 'C11' in input_filepaths[0] and 'C22' in input_filepaths[5] and 'C33' in input_filepaths[8]:
C11 = np.array(chunks[0])
C12 = np.array(chunks[1])+1j*np.array(chunks[2])
C13 = np.array(chunks[3])+1j*np.array(chunks[4])
C21 = np.conj(C12)
C22 = np.array(chunks[5])
C23 = np.array(chunks[6])+1j*np.array(chunks[7])
C31 = np.conj(C13)
C32 = np.conj(C23)
C33 = np.array(chunks[8])
C3 = np.array([[C11, C12, C13],
[C21, C22, C23],
[C31, C32, C33]])
span = C3[0,0].real+C3[1,1].real+C3[2,2].real
T_T1 = C3_T3_mat(C3)
del C3
if window_size>1:
kernel = np.ones((window_size,window_size),np.float32)/(window_size*window_size)
t11f = conv2d(T_T1[0,0,:,:],kernel)
t12f = conv2d(np.real(T_T1[0,1,:,:]),kernel)+1j*conv2d(np.imag(T_T1[0,1,:,:]),kernel)
t13f = conv2d(np.real(T_T1[0,2,:,:]),kernel)+1j*conv2d(np.imag(T_T1[0,2,:,:]),kernel)
t21f = np.conj(t12f)
t22f = conv2d(T_T1[1,1,:,:],kernel)
t23f = conv2d(np.real(T_T1[1,2,:,:]),kernel)+1j*conv2d(np.imag(T_T1[1,2,:,:]),kernel)
t31f = np.conj(t13f)
t32f = np.conj(t23f)
t33f = conv2d(T_T1[2,2,:,:],kernel)
T_T1 = np.array([[t11f, t12f, t13f], [t21f, t22f, t23f], [t31f, t32f, t33f]])
_,_,rows,cols = np.shape(T_T1)
SpanMax = np.nanmax(span)
SpanMin = np.nanmin(span)
eps = 1e-6
SpanMin = np.nanmax([SpanMin, eps])
T_T1 = T_T1.reshape(9, rows, cols)
T_T1 = np.dstack((T_T1[0,:,:],T_T1[1,:,:],T_T1[2,:,:],
T_T1[3,:,:],T_T1[4,:,:],T_T1[5,:,:],T_T1[6,:,:],T_T1[7,:,:],T_T1[8,:,:]))
M_odd = np.zeros((rows,cols))
M_dbl = np.zeros((rows,cols))
M_vol = np.zeros((rows,cols))
M_hlx = np.zeros((rows,cols))
# print(model)
for ii in range(rows):
for jj in range(cols):
T3 = np.array((
T_T1[ii,jj,0],T_T1[ii,jj,1],T_T1[ii,jj,2],
T_T1[ii,jj,3],T_T1[ii,jj,4],T_T1[ii,jj,5],
T_T1[ii,jj,6],T_T1[ii,jj,7],T_T1[ii,jj,8],
))
if model in ["y4cr", "y4cs"]:
# print(model)
teta = 0.5 * np.arctan(2 * T3[5].real / (T3[4].real - T3[8].real))
T3 = unitary_rotation(T3, teta)
if np.allclose(T3, 0,rtol=1e-8, atol=1e-10):
M_odd[ii, jj] = np.nan
M_dbl[ii, jj] = np.nan
M_vol[ii, jj] = np.nan
M_hlx[ii, jj] = np.nan
continue
Pc = 2.0 * np.abs(T3[5].imag)
HV_type = 1
if model=="y4cs":
# print(model)
C1 = T3[0] - T3[4] + (7.0 / 8.0) * T3[8] + (Pc / 16.0)
if C1 > 0.0:
HV_type = 1 # Surface scattering
else:
HV_type = 2 # Double bounce scattering
# Surface scattering
if HV_type == 1:
ratio = 10.0 * np.log10((T3[0] + T3[4] - 2.0 * T3[1].real) / (T3[0] + T3[4] + 2.0 * T3[1].real))
if -2.0 < ratio <= 2.0:
Pv = 2.0 * (2.0 * T3[8] - Pc)
else:
Pv = (15.0 / 8.0) * (2.0 * T3[8] - Pc)
# Double bounce scattering
if HV_type == 2:
Pv = (15.0 / 16.0) * (2.0 * T3[8] - Pc)
TP = np.real(T3[0] + T3[4] + T3[8])
if Pv < 0.0:
BETre = 0.0
BETim = 0.0
ALPre = 0.0
ALPim = 0.0
FS = 0.0
FD = 0.0
FV = 0.0
# Freeman - Yamaguchi 3-components algorithm
HHHH = (T3[0] + 2.0 * T3[1].real + T3[4]) / 2.0
HHVVre = (T3[0] - T3[4]) / 2.0
HHVVim = -T3[1].imag
HVHV = T3[8] / 2.0
VVVV = (T3[0] - 2.0 * T3[1].real + T3[4]) / 2.0
ratio = 10.0 * np.log10(VVVV / HHHH)
if ratio <= -2.0:
FV = 15.0 * (HVHV / 4.0)
HHHH -= 8.0 * (FV / 15.0)
VVVV -= 3.0 * (FV / 15.0)
HHVVre -= 2.0 * (FV / 15.0)
if ratio > 2.0:
FV = 15.0 * (HVHV / 4.0)
HHHH -= 3.0 * (FV / 15.0)
VVVV -= 8.0 * (FV / 15.0)
HHVVre -= 2.0 * (FV / 15.0)
if -2.0 < ratio <= 2.0:
FV = 8.0 * (HVHV / 2.0)
HHHH -= 3.0 * (FV / 8.0)
VVVV -= 3.0 * (FV / 8.0)
HHVVre -= 1.0 * (FV / 8.0)
# Case 1: Volume Scatter > Total
if HHHH <= eps or VVVV <= eps:
FD = 0.0
FS = 0.0
if -2.0 < ratio <= 2.0:
FV = (HHHH + 3.0 * (FV / 8.0)) + HVHV + (VVVV + 3.0 * (FV / 8.0))
if ratio <= -2.0:
FV = (HHHH + 8.0 * (FV / 15.0)) + HVHV + (VVVV + 3.0 * (FV / 15.0))
if ratio > 2.0:
FV = (HHHH + 3.0 * (FV / 15.0)) + HVHV + (VVVV + 8.0 * (FV / 15.0))
else:
# Data conditioning for non realizable ShhSvv* term
rtemp = HHVVre ** 2 + HHVVim ** 2
if rtemp > HHHH * VVVV:
HHVVre = HHVVre * np.sqrt((HHHH * VVVV) / rtemp)
HHVVim = HHVVim * np.sqrt((HHHH * VVVV) / rtemp)
# Odd Bounce
if HHVVre >= 0.0:
ALPre = -1.0
ALPim = 0.0
FD = (HHHH * VVVV - HHVVre ** 2 - HHVVim ** 2) / (HHHH + VVVV + 2.0 * HHVVre)
FS = VVVV - FD
BETre = (FD + HHVVre) / FS
BETim = HHVVim / FS
# Even Bounce
if HHVVre < 0.0:
BETre = 1.0
BETim = 0.0
FS = (HHHH * VVVV - HHVVre ** 2 - HHVVim ** 2) / (HHHH + VVVV - 2.0 * HHVVre)
FD = VVVV - FS
ALPre = (HHVVre - FS) / FD
ALPim = HHVVim / FD
M_odd[ii, jj] = FS * (1 + BETre ** 2 + BETim ** 2)
M_dbl[ii, jj] = FD * (1 + ALPre ** 2 + ALPim ** 2)
M_vol[ii, jj] = FV
M_hlx[ii, jj] = 0.0
# # Apply Span limits
M_odd[ii, jj] = np.clip(M_odd[ii, jj], SpanMin, SpanMax)
M_dbl[ii, jj] = np.clip(M_dbl[ii, jj], SpanMin, SpanMax)
M_vol[ii, jj] = np.clip(M_vol[ii, jj], SpanMin, SpanMax)
else:
# Surface scattering (HV_type == 1)
if HV_type == 1:
S = T3[0].real - (Pv / 2.)
D = TP - Pv - Pc - S
Cre = T3[1].real + T3[2].real
Cim = T3[1].imag + T3[2].imag
if ratio <= -2.:
Cre = Cre - (Pv / 6.)
if ratio > 2.:
Cre = Cre + (Pv / 6.)
if (Pv + Pc) > TP:
Ps = 0.
Pd = 0.
Pv = TP - Pc
else:
CO = 2. * T3[0].real + Pc - TP
if CO > 0.:
Ps = S + (Cre * Cre + Cim * Cim) / S
Pd = D - (Cre * Cre + Cim * Cim) / S
else:
Pd = D + (Cre * Cre + Cim * Cim) / D
Ps = S - (Cre * Cre + Cim * Cim) / D
if Ps < 0.:
if Pd < 0.:
Ps = 0.
Pd = 0.
Pv = TP - Pc
else:
Ps = 0.
Pd = TP - Pv - Pc
else:
if Pd < 0.:
Pd = 0.
Ps = TP - Pv - Pc
# Double bounce scattering (HV_type == 2)
elif HV_type == 2:
S = T3[0].real
D = TP - Pv - Pc - S
Cre = T3[1].real + T3[2].real
Cim = T3[1].imag + T3[2].imag
Pd = D + (Cre * Cre + Cim * Cim) / D
Ps = S - (Cre * Cre + Cim * Cim) / D
if Ps < 0.:
if Pd < 0.:
Ps = 0.
Pd = 0.
Pv = TP - Pc
else:
Ps = 0.
Pd = TP - Pv - Pc
else:
if Pd < 0.:
Pd = 0.
Ps = TP - Pv - Pc
# Ensure values are non-negative and within bounds
if Ps < 0.:
Ps = 0.
if Pd < 0.:
Pd = 0.
if Pv < 0.:
Pv = 0.
if Pc < 0.:
Pc = 0.
if Ps > SpanMax:
Ps = SpanMax
if Pd > SpanMax:
Pd = SpanMax
if Pv > SpanMax:
Pv = SpanMax
if Pc > SpanMax:
Pc = SpanMax
# Store the results in the output arrays
M_odd[ii, jj] = Ps
M_dbl[ii, jj] = Pd
M_vol[ii, jj] = Pv
M_hlx[ii, jj] = Pc
# print(np.nanmean(M_odd),np.nanmean(M_dbl),np.nanmean(M_vol),np.nanmean(M_hlx))
return M_odd.astype(np.float32), M_dbl.astype(np.float32), \
M_vol.astype(np.float32), M_hlx.astype(np.float32)