Coverage for src/flexfrac1d/lib/graphics.py: 14%

103 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-08-30 14:00 +0200

1from __future__ import annotations 

2 

3import functools 

4import typing 

5 

6import matplotlib 

7from matplotlib.animation import FuncAnimation 

8from matplotlib.collections import LineCollection 

9import matplotlib.pyplot as plt 

10import numpy as np 

11 

12from . import numerical, physics as ph 

13 

14if typing.TYPE_CHECKING: 

15 from ..model import Domain, WavesUnderFloe 

16 

17 

18def _linspace_nums(resolution: float, lengths: list[float]) -> list[int]: 

19 # For each floe, return the number of points needed to discretise its 

20 # length at the specified resolution 

21 return [np.ceil(length / resolution).astype(int) + 1 for length in lengths] 

22 

23 

24def _domain_to_lengths(domain: Domain) -> list[float]: 

25 return [wuf.length for wuf in domain.subdomains] 

26 

27 

28def _surface_segments( 

29 resolution: float, domain: Domain, left_bound: float, an_sol: bool | None 

30) -> list[np.ndarray]: 

31 nxs = _linspace_nums(resolution, _domain_to_lengths(domain)) 

32 # Array of points to discretise the free surface on the left of the ice domain 

33 xfs = np.linspace( 

34 left_bound, 

35 domain.subdomains[0].left_edge, 

36 np.ceil((domain.subdomains[0].left_edge - left_bound) / resolution).astype(int) 

37 + 1, 

38 ) 

39 growth_params = None if an_sol else (domain.growth_mean, domain.growth_std) 

40 # TODO: replace numerical.free_surface by a FluidSurfaceHandler 

41 yfs = numerical.free_surface( 

42 xfs, 

43 (domain.spectrum._amps, domain.ocean.wavenumbers, domain.spectrum._phases), 

44 growth_params, 

45 ) 

46 segments = [np.vstack((xfs, yfs)).T] + [np.full((nx, 2), np.nan) for nx in nxs] 

47 for i, (nx, floe) in enumerate(zip(nxs, domain.subdomains), 1): 

48 segments[i][:, 0] = np.linspace(0, floe.length, nx) 

49 growth_params = None if an_sol else domain._pack_growth(floe) 

50 segments[i][:, 1] = floe.forcing( 

51 segments[i][:, 0], domain.spectrum, growth_params 

52 ) 

53 segments[i][:, 0] += floe.left_edge 

54 return segments 

55 

56 

57def _compute_segment( 

58 nx: int, 

59 wuf: WavesUnderFloe, 

60 growth_params, 

61 an_sol: bool | None, 

62 num_params: dict | None, 

63) -> np.ndarray: 

64 segment = np.full((nx, 2), np.nan) 

65 segment[:, 0] = np.linspace(0, wuf.length, nx) 

66 segment[:, 1] = wuf.displacement( 

67 segment[:, 0], 

68 growth_params, 

69 an_sol, 

70 num_params, 

71 ) 

72 segment[:, 0] += wuf.left_edge 

73 return segment 

74 

75 

76def _dis_segments( 

77 resolution: float, 

78 domain: Domain, 

79 an_sol: bool | None, 

80 num_params: dict | None, 

81 base: float, 

82): 

83 # TODO: use `base` to offset, e.g. to the bottom or the top of the floe. 

84 # Probably pass it to _compute_segment. 

85 nxs = _linspace_nums(resolution, _domain_to_lengths(domain)) 

86 # Freeze the arguments that do not vary between calls 

87 compute_segments = functools.partial( 

88 _compute_segment, 

89 **dict( 

90 an_sol=an_sol, growth_params=domain.growth_params, num_params=num_params 

91 ), 

92 ) 

93 segments = [compute_segments(nxs[i], domain.subdomains[i]) for i in range(len(nxs))] 

94 return segments 

95 

96 

97def plot_displacement( 

98 resolution: float, 

99 domain: Domain, 

100 left_bound=None, 

101 ax: matplotlib.axes.Axes = None, 

102 an_sol: bool | None = None, 

103 num_params: dict | None = None, 

104 add_surface=True, 

105 base=0, 

106 kw_dis=None, 

107 kw_sur=None, 

108): 

109 if kw_dis is None: 

110 kw_dis = {"color": "k", "lw": 3} 

111 displacements = LineCollection( 

112 _dis_segments(resolution, domain, an_sol, num_params, base), **kw_dis 

113 ) 

114 if left_bound is None: 

115 left_bound = domain.subdomains[0].left_edge 

116 if ax is None: 

117 ax = plt.gca() 

118 wave_height = np.sum(domain.spectrum._amps**2) ** 0.5 * 1.1 

119 ax.axis( 

120 [left_bound, domain.subdomains[-1].right_edge, -wave_height, wave_height] 

121 ) 

122 

123 if add_surface: 

124 if kw_sur is None: 

125 kw_sur = {"color": "#008aa6", "lw": 1.5} 

126 surface = LineCollection( 

127 _surface_segments(resolution, domain, left_bound, an_sol), **kw_sur 

128 ) 

129 ax.add_collection(surface) 

130 ax.add_collection(displacements) 

131 return ax 

132 

133 

134def animate_displacement( 

135 resolution, 

136 experiment, 

137): 

138 # idx = 10 

139 floe0 = experiment.history[0].subdomains[0] 

140 times, floes_from_time, growth_params = zip( 

141 *[(k, v[0], v[1]) for k, v in experiment.history.items()] 

142 ) 

143 # scales = np.geomspace(.5, 32, 13) 

144 # dt = 7 * .095 / scales[idx] 

145 dt = times[1] 

146 total_time = times[-1] 

147 # experiment = read_experiment(0, True, True) 

148 n_ts = np.ceil(total_time / dt).astype(int) 

149 vertical_range = np.sqrt(np.sum(experiment.domain.spectrum._amps**2)) * 1.1 

150 

151 def animate(i, lines): 

152 fig.suptitle(f"t = {times[i]:.2f} s, µ_max = {growth_params[i][0].max():.2f} m") 

153 nfloes = len(floes_from_time[i]) 

154 xcs = {_l.get_xdata()[0]: j + 1 for j, _l in enumerate(ax.get_lines()[1:])} 

155 new_xcs = [] 

156 if nfloes > 1: 

157 for floe in floes_from_time[i][1:]: 

158 xc = floe.left_edge 

159 if xc in xcs: 

160 ax.get_lines()[xcs[xc]].set_color("C0") 

161 else: 

162 new_xcs.append(xc) 

163 

164 for xc in new_xcs: 

165 line = ax.axvline(xc, ymax=0.05, lw=5, c="C1") 

166 lines.append(line) 

167 

168 xfts, yfts = [], [] 

169 segments = [] 

170 for j, cf in enumerate(floes_from_time[i]): 

171 xft = np.linspace(0, cf.length, np.ceil(cf.length).astype(int) * 10) 

172 yft = ph.DisplacementHandler.from_wuf(cf, growth_params[i]).compute(xft) 

173 # cf.displacement( 

174 # xft, 

175 # domain.spectrum, 

176 # ( 

177 # growth_params[i][0]-cf.left_edge, 

178 # growth_params[i][1] 

179 # ), 

180 # an_sol, 

181 # None, 

182 # ) 

183 xfts.append(xft + cf.left_edge) 

184 yfts.append(yft) 

185 segments.append(np.vstack((xft + cf.left_edge, yft)).T) 

186 lc.set_segments(segments) 

187 

188 segments = [] 

189 for j, cf in enumerate(floes_from_time[i]): 

190 xft = np.linspace(0, cf.length, np.ceil(cf.length).astype(int) * 10) 

191 yft = ph.FluidSurfaceHandler.from_wuf(cf, growth_params[i]).compute(xft) 

192 # yft = cf.forcing( 

193 # xft, 

194 # domain.spectrum, 

195 # ( 

196 # growth_params[i][0]-cf.left_edge, 

197 # growth_params[i][1] 

198 # ), 

199 # ) 

200 xfts.append(xft + cf.left_edge) 

201 yfts.append(yft) 

202 segments.append(np.vstack((xft + cf.left_edge, yft)).T) 

203 lc_surf.set_segments(segments) 

204 return lines 

205 

206 fig, ax = plt.subplots(figsize=(16, 1.6)) 

207 ax.axis([floe0.left_edge, floe0.right_edge, -vertical_range, vertical_range]) 

208 lines = ax.plot([], [], lw=3, c="k") 

209 lc = LineCollection([], color="k", lw=2) 

210 lc_surf = LineCollection([], color="#008aa6", lw=1) 

211 ax.add_collection(lc) 

212 ax.add_collection(lc_surf) 

213 ax.set_xlabel("Along-floe coordinate (m)") 

214 ax.set_ylabel("Wave amplitude (m)") 

215 ani = FuncAnimation( 

216 fig, animate, frames=n_ts, blit=True, fargs=(lines,), interval=dt * 1e3 

217 ) 

218 return ani 

219 # ani.save(vtitle)