Coverage for manyworlds/scenario_forest.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-08-11 11:24 +0200

1"""Defines the ScenarioForest Class""" 

2import re 

3import igraph as ig 

4import io 

5 

6from .scenario import Scenario 

7from .step import Step, Prerequisite, Action, Assertion 

8 

9class ScenarioForest: 

10 """A collection of one or more directed trees the vertices of which represent BDD scenarios""" 

11 

12 TAB_SIZE = 4 

13 indentation_pattern = rf'(?P<indentation>( {{{TAB_SIZE}}})*)' 

14 scenario_pattern = r'Scenario: (?P<scenario_name>.*)' 

15 

16 SCENARIO_LINE_PATTERN = re.compile("^{}{}$".format(indentation_pattern, scenario_pattern)) 

17 STEP_LINE_PATTERN = re.compile("^{}{}$".format(indentation_pattern, Step.step_pattern)) 

18 TABLE_LINE_PATTERN = re.compile("^{}{}$".format(indentation_pattern, Step.table_pattern)) 

19 

20 def __init__(self, graph): 

21 """Constructor method 

22 

23 Parameters 

24 ---------- 

25 graph : igraph.Graph 

26 The graph representing the set of scenario trees 

27 """ 

28 

29 self.graph = graph 

30 

31 @classmethod 

32 def data_table_list_to_dict(cls, data_table): 

33 """Convert a data table from list of list to list of dict 

34 

35 Parameters 

36 ---------- 

37 data_table : list 

38 List of (equal-length) list of str. The first list is used as headers 

39 

40 Returns 

41 ------- 

42 list 

43 List of dict 

44 """ 

45 

46 header_row = data_table[0] 

47 return [dict(zip(header_row, row)) for row in data_table[1:]] 

48 

49 @classmethod 

50 def data_table_dict_to_list(cls, data_table): 

51 """Convert a data table from list of dict to list of list 

52 

53 Parameters 

54 ---------- 

55 data_table : list 

56 List of dict 

57 

58 Returns 

59 ------- 

60 list 

61 List of (equal-length) list of str. The first list is used for headers 

62 """ 

63 

64 return [list(data_table[0].keys())] + [list(row.values()) for row in data_table] 

65 

66 @classmethod 

67 def from_file(cls, file_path): 

68 """Create a scenario tree instance from an indented feature file 

69 

70 Scans the indented file line by line and: 

71 1. Keeps track of the last scenario encountered at each indentation level 

72 2. Any scenario encountered is added as a child to the last scenario encounterd 

73 at the parent level 

74 3. Any prerequisite, action or assertion encountered is added to the last scenario 

75 encountered at that level 

76 4. Any data table encountered is added the the current step 

77 

78 Parameters 

79 ---------- 

80 file_path : str 

81 Path to the indented feature file 

82 

83 Returns 

84 ------- 

85 ScenarioForest 

86 Instance of ScenarioForest 

87 """ 

88 

89 graph = ig.Graph(directed=True) 

90 with open(file_path) as indented_file: 

91 raw_lines = [l.rstrip('\n') for l in indented_file.readlines() if not l.strip() == ""] 

92 current_scenarios = {} # used to keep track of last scenario encountered at each level 

93 current_table = None 

94 current_step = None 

95 

96 # Scan the file line by line 

97 for line in raw_lines: 

98 

99 # Determine whether line is scenario, step or table row 

100 scenario_match = cls.SCENARIO_LINE_PATTERN.match(line) 

101 step_match = cls.STEP_LINE_PATTERN.match(line) 

102 table_match = cls.TABLE_LINE_PATTERN.match(line) 

103 if not (scenario_match or step_match or table_match): 

104 raise ValueError('Unable to parse line: ' + line.strip()) 

105 

106 # close and record any open data table 

107 if (scenario_match or step_match) and current_table: 

108 current_step.data = ScenarioForest.data_table_list_to_dict(current_table) 

109 current_table = None 

110 

111 if scenario_match: # Line is scenario 

112 current_level = int(len((scenario_match)['indentation']) / cls.TAB_SIZE) 

113 

114 current_scenario_vertex = graph.add_vertex() 

115 current_scenario = Scenario(scenario_match['scenario_name'], current_scenario_vertex) 

116 current_scenario_vertex['scenario'] = current_scenario 

117 current_scenarios[current_level] = current_scenario 

118 if current_level > 0: 

119 # Connect to parent scenario 

120 current_scenario_parent = current_scenarios[current_level-1] 

121 graph.add_edge(current_scenario_parent.vertex, current_scenario.vertex) 

122 

123 elif step_match: # Line is action or assertion 

124 current_scenario = current_scenarios[current_level] 

125 new_step = Step.parse(step_match[0].strip(), previous_step=current_step) 

126 current_scenario.steps.append(new_step) 

127 current_step = new_step 

128 

129 elif table_match: # Line is table row 

130 if current_table == None: 

131 current_table = [] 

132 row = [s.strip() for s in line.split('|')[1:-1]] 

133 current_table.append(row) 

134 

135 # In case the file ends with a data table: 

136 if current_table: 

137 current_step.data = ScenarioForest.data_table_list_to_dict(current_table) 

138 

139 return ScenarioForest(graph) 

140 

141 @classmethod 

142 def write_scenario_name_strict(cls, file_handle, scenario): 

143 """Write formatted scenario name to the end of a 'strict' flat feature file 

144 

145 Uses both any organizatinal scenarios and the validated scenario 

146 

147 Parameters 

148 ---------- 

149 file_handle : io.TextIOWrapper 

150 The file to which to append the scenario 

151 

152 scenario : Scenario 

153 Scenario to append to file_handle 

154 

155 Returns 

156 ------- 

157 None 

158 """ 

159 

160 file_handle.write("Scenario: " + scenario.name_with_breadcrumbs() + "\n") 

161 

162 @classmethod 

163 def write_scenario_name_relaxed(cls, file_handle, scenarios): 

164 """Write formatted scenario name to the end of a 'relaxed' flat feature file 

165 

166 Uses both any organizatinal scenarios and all validated scenarios 

167 

168 Parameters 

169 ---------- 

170 file_handle : io.TextIOWrapper 

171 The file to which to append the scenario 

172 

173 scenarios : list 

174 List of Scenario. Scenario to append to file_handle 

175 

176 Returns 

177 ------- 

178 None 

179 """ 

180 

181 scenario_name = ''.join([sc.name + (' > ' if sc.organizational_only() else ' / ') for sc in scenarios[:-1]]) + scenarios[-1].name 

182 file_handle.write("Scenario: " + scenario_name + "\n") 

183 

184 @classmethod 

185 def write_scenario_steps(cls, file_handle, steps, comments=False): 

186 """Write formatted scenario steps to the end of the flat feature file 

187 

188 Parameters 

189 ---------- 

190 file_handle : io.TextIOWrapper 

191 The file to which to append the steps 

192 

193 steps : list 

194 List of Step. Steps to append to file_handle 

195 

196 comments: bool 

197 Whether or not to write comments 

198 

199 Returns 

200 ------- 

201 None 

202 """ 

203 

204 last_step = None 

205 for step_num, step in enumerate(steps): 

206 first_of_type = (last_step == None or last_step.conjunction != step.conjunction) 

207 file_handle.write(step.format(first_of_type=first_of_type) + "\n") 

208 if comments and step.comment: 

209 file_handle.write("# " + step.comment + "\n") 

210 if step.data: 

211 ScenarioForest.write_data_table(file_handle, step.data) 

212 last_step = step 

213 

214 @classmethod 

215 def write_data_table(cls, file_handle, data_table): 

216 """Write formatted data table to the end of the flat feature file 

217 

218 Parameters 

219 ---------- 

220 file_handle : io.TextIOWrapper 

221 The file to which to append the data table 

222 

223 data_table : list 

224 List of dict 

225 

226 Returns 

227 ------- 

228 None 

229 """ 

230 

231 data = ScenarioForest.data_table_dict_to_list(data_table) 

232 col_widths = [max([len(cell) for cell in col]) for col in list(zip(*data))] 

233 for row in data: 

234 padded_row = [row[col_num].ljust(col_width) for col_num, col_width in enumerate(col_widths)] 

235 file_handle.write(" | {} |\n".format(" | ".join(padded_row))) 

236 

237 def flatten(self, file_path, mode='strict', comments=False): 

238 """Write a flat (no indentation) feature file representing the scenario forest 

239 

240 Parameters 

241 ---------- 

242 file_path : str 

243 Path to flat feature file to be written 

244 

245 mode : {'strict', 'relaxed'}, default='strict' 

246 Flattening mode. Either 'strict' or 'relaxed' 

247 

248 comments : str, default = False 

249 Whether or not to write comments 

250 

251 Returns 

252 ------- 

253 None 

254 """ 

255 

256 if mode == 'strict': 

257 self.flatten_strict(file_path, comments=comments) 

258 elif mode == 'relaxed': 

259 self.flatten_relaxed(file_path, comments=comments) 

260 

261 def flatten_strict(self, file_path, comments=False): 

262 """Write a flat (no indentation) feature file representing the forest using the 'strict' flattening mode 

263 

264 The 'strict' flattening mode writes one scenario per vertex in the tree, resulting in 

265 a feature file with one set of 'When' steps followed by one set of 'Then' steps (generally recommended) 

266 

267 Parameters 

268 ---------- 

269 file_path : str 

270 Path to flat feature file 

271 

272 comments : bool 

273 Whether or not to write comments 

274 

275 Returns 

276 ------- 

277 None 

278 """ 

279 

280 with open(file_path, 'w') as flat_file: 

281 for scenario in [sc for sc in self.scenarios() if not sc.organizational_only()]: 

282 ScenarioForest.write_scenario_name_strict(flat_file, scenario) 

283 

284 ancestor_scenarios = scenario.ancestors() 

285 steps=[] 

286 # collect prerequisites from all scenarios along the path 

287 steps += [st 

288 for sc in ancestor_scenarios 

289 for st in sc.prerequisites()] 

290 # collect actions from all scenarios along the path 

291 steps += [st 

292 for sc in ancestor_scenarios 

293 for st in sc.actions()] 

294 # add all steps from the destination scenario only 

295 steps += scenario.steps 

296 ScenarioForest.write_scenario_steps(flat_file, steps, comments=comments) 

297 flat_file.write("\n") 

298 

299 def flatten_relaxed(self, file_path, comments=False): 

300 """Write a flat (no indentation) feature file representing the tree using the 'relaxed' flattening mode 

301 

302 The 'relaxed' flattening mode writes one scenario per leaf vertex in the tree, resulting in 

303 a feature file with multiple alternating sets of "When" and "Then" steps per (generally considered an anti-pattern) 

304 

305 Parameters 

306 ---------- 

307 file_path : str 

308 Path to flat feature file 

309 

310 comments : bool 

311 Whether or not to write comments 

312 

313 Returns 

314 ------- 

315 None 

316 """ 

317 

318 with open(file_path, 'w') as flat_file: 

319 for scenario in self.leaf_scenarios(): 

320 

321 steps=[] 

322 scenarios_for_naming = [] # organizational scenarios and validated scenarios 

323 for path_scenario in scenario.path_scenarios(): 

324 if path_scenario.organizational_only(): 

325 scenarios_for_naming.append(path_scenario) 

326 continue 

327 steps += path_scenario.prerequisites() 

328 steps += path_scenario.actions() 

329 if not path_scenario.validated: 

330 steps += path_scenario.assertions() 

331 path_scenario.validated = True 

332 scenarios_for_naming.append(path_scenario) 

333 

334 ScenarioForest.write_scenario_name_relaxed(flat_file, scenarios_for_naming) 

335 ScenarioForest.write_scenario_steps(flat_file, steps, comments=comments) 

336 flat_file.write("\n") 

337 

338 def find(self, *scenario_names): 

339 """Find a scenario by the names of all scenarios along the path from a root scenario to the destination scenario 

340 

341 Parameters 

342 ---------- 

343 scenario_names : list[str] 

344 List of scenario names 

345 

346 Returns 

347 ------- 

348 Scenario 

349 The found scenario 

350 """ 

351 

352 scenario = next(sc for sc in self.root_scenarios() if sc.name == scenario_names[0]) 

353 for scenario_name in scenario_names[1:]: 

354 scenario = next(vt['scenario'] for vt in scenario.vertex.successors() if vt['scenario'].name == scenario_name) 

355 

356 return scenario 

357 

358 def scenarios(self): 

359 """Return all scenarios 

360 

361 Returns 

362 ------- 

363 list 

364 list[Scenario]. All scenarios 

365 """ 

366 

367 return [vx['scenario'] for vx in self.graph.vs] 

368 

369 def root_scenarios(self): 

370 """Return the root scenarios (scenarios with vertices without incoming edges) 

371 

372 Returns 

373 ------- 

374 list 

375 list[Scenario]. All root scenarios 

376 """ 

377 return [vx['scenario'] for vx in self.graph.vs if vx.indegree() == 0] 

378 

379 def leaf_scenarios(self): 

380 """Return the leaf scenarios (scenarios with vertices without outgoing edges) 

381 

382 Returns 

383 ------- 

384 list 

385 list[Scenario]. All leaf scenarios 

386 """ 

387 return [vx['scenario'] for vx in self.graph.vs if vx.outdegree() == 0]