Coverage for tests/test_optwps.py: 92%

200 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-09 23:08 +0100

1from pytest import fixture 

2import pysam 

3 

4 

5from optwps import exopen, is_soft_clipped, ref_aln_length 

6from bx.intervals import Intersecter, Interval 

7 

8 

9@fixture 

10def make_test_bed_file(tmp_path): 

11 bed_content = """chr1\t1000\t2000 

12chr2\t1500\t2500 

13chr3\t0\t100 

14chr4\t50\t150 

15chr5\t3000\t4000 

16chrM\t200\t800 

17chrX\t500\t1500 

18chrUn_gl000220\t100\t300 

19""" 

20 bed_file = tmp_path / "test_regions.bed" 

21 bed_file.write_text(bed_content) 

22 return bed_file 

23 

24 

25def _create_read(name, ref_id, pos, length, flag, mate_pos=None, isize=None): 

26 """Helper function to create a properly configured read. 

27 

28 For paired-end reads, provide mate_pos and isize. 

29 For single-end reads, leave mate_pos and isize as None. 

30 """ 

31 read = pysam.AlignedSegment() 

32 read.query_name = name 

33 read.reference_id = ref_id 

34 read.reference_start = pos 

35 read.cigar = ((0, length),) 

36 read.mapping_quality = 60 

37 read.query_sequence = "A" * length 

38 read.query_qualities = pysam.qualitystring_to_array("I" * length) 

39 read.flag = flag 

40 

41 # Set mate information only for paired-end reads 

42 if mate_pos is not None: 

43 read.next_reference_id = ref_id 

44 read.next_reference_start = mate_pos 

45 read.template_length = isize if isize is not None else 0 

46 

47 return read 

48 

49 

50@fixture 

51def make_test_bam_file_paired(tmp_path, make_test_bed_file): 

52 bam_path = tmp_path / "test_reads.bam" 

53 header = { 

54 "HD": {"VN": "1.0"}, 

55 "SQ": [ 

56 {"LN": 5000, "SN": "chr1"}, 

57 {"LN": 5000, "SN": "chr2"}, 

58 {"LN": 5000, "SN": "chrX"}, 

59 {"LN": 1000, "SN": "chrM"}, 

60 {"LN": 300, "SN": "chrUn_gl000220"}, 

61 {"LN": 5000, "SN": "chr3"}, 

62 {"LN": 5000, "SN": "chr4"}, 

63 {"LN": 5000, "SN": "chr5"}, 

64 ], 

65 } 

66 

67 with pysam.AlignmentFile(bam_path, "wb", header=header) as outf: 

68 for line in make_test_bed_file.read_text().strip().split("\n"): 

69 chrom, start, end = line.split("\t")[:3] 

70 start, end = int(start), int(end) 

71 ref_id = outf.get_tid(chrom) 

72 

73 # Pair 1: Completely spans the region 

74 pos1, pos2, length = max(1, start - 10), end + 5, 100 

75 isize = (pos2 + length) - pos1 

76 name = f"pair1_{chrom}_{start}" 

77 outf.write(_create_read(name, ref_id, pos1, length, 99, pos2, isize)) 

78 outf.write(_create_read(name, ref_id, pos2, length, 147, pos1, -isize)) 

79 

80 # Pair 2: Partially outside the region 

81 pos1, pos2, length = max(1, end - 50), end + 30, 80 

82 isize = (pos2 + length) - pos1 

83 name = f"pair2_{chrom}_{start}" 

84 outf.write(_create_read(name, ref_id, pos1, length, 99, pos2, isize)) 

85 outf.write(_create_read(name, ref_id, pos2, length, 147, pos1, -isize)) 

86 

87 # Sort and index the BAM file 

88 sorted_bam_path = str(tmp_path / "test_reads_sorted.bam") 

89 pysam.sort("-o", sorted_bam_path, str(bam_path)) 

90 pysam.index(sorted_bam_path) 

91 return sorted_bam_path 

92 

93 

94@fixture 

95def make_test_bam_file_single(tmp_path, make_test_bed_file): 

96 """Create a single-end BAM file for testing.""" 

97 bam_path = tmp_path / "test_reads_single.bam" 

98 header = { 

99 "HD": {"VN": "1.0"}, 

100 "SQ": [ 

101 {"LN": 5000, "SN": "chr1"}, 

102 {"LN": 5000, "SN": "chr2"}, 

103 {"LN": 5000, "SN": "chrX"}, 

104 {"LN": 1000, "SN": "chrM"}, 

105 {"LN": 300, "SN": "chrUn_gl000220"}, 

106 {"LN": 5000, "SN": "chr3"}, 

107 {"LN": 5000, "SN": "chr4"}, 

108 {"LN": 5000, "SN": "chr5"}, 

109 ], 

110 } 

111 

112 with pysam.AlignmentFile(bam_path, "wb", header=header) as outf: 

113 for line in make_test_bed_file.read_text().strip().split("\n"): 

114 chrom, start, end = line.split("\t")[:3] 

115 start, end = int(start), int(end) 

116 ref_id = outf.get_tid(chrom) 

117 

118 # Read 1: Completely spans the region (forward strand) 

119 pos, length = max(1, start - 10), end - start + 20 

120 outf.write( 

121 _create_read(f"read1_{chrom}_{start}", ref_id, pos, length, flag=0) 

122 ) 

123 

124 # Read 2: Partially outside the region (reverse strand) 

125 pos, length = end - 30, 60 

126 outf.write( 

127 _create_read(f"read2_{chrom}_{start}", ref_id, pos, length, flag=16) 

128 ) 

129 

130 # Sort and index the BAM file 

131 sorted_bam_path = str(tmp_path / "test_reads_single_sorted.bam") 

132 pysam.sort("-o", sorted_bam_path, str(bam_path)) 

133 pysam.index(sorted_bam_path) 

134 return sorted_bam_path 

135 

136 

137def old_implementation( 

138 bed_file, 

139 bamfile, 

140 out_filepath, 

141 protection_size, 

142 valid_chroms, 

143 min_insert_size, 

144 max_insert_size, 

145): 

146 protection = protection_size // 2 

147 with exopen(bed_file, "r") as infile: 

148 for cnt, line in enumerate(infile): 

149 chrom, start, end = line.split( 

150 "\t" 

151 ) # positions should be 0-based and end non-inclusive 

152 chrom = chrom.replace("chr", "") 

153 if chrom not in valid_chroms: 

154 continue 

155 

156 regionStart, regionEnd = int(start), int(end) 

157 

158 filteredReads = Intersecter() 

159 

160 input_file = pysam.Samfile(bamfile, "rb") 

161 prefix = "" 

162 for tchrom in input_file.references: 162 ↛ 167line 162 didn't jump to line 167 because the loop on line 162 didn't complete

163 if tchrom.startswith("chr"): 163 ↛ 162line 163 didn't jump to line 162 because the condition on line 163 was always true

164 prefix = "chr" 

165 break 

166 

167 for read in input_file.fetch( 

168 prefix + chrom, 

169 max(0, regionStart - protection - 1), 

170 regionEnd + protection + 1, 

171 ): 

172 if read.is_duplicate or read.is_qcfail or read.is_unmapped: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 continue 

174 if is_soft_clipped(read.cigar): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 continue 

176 

177 if read.is_paired: 

178 if read.mate_is_unmapped: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 continue 

180 if read.rnext != read.tid: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 continue 

182 if read.is_read1 or ( 

183 read.is_read2 

184 and read.pnext + read.qlen < regionStart - protection - 1 

185 ): 

186 if read.isize == 0: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 continue 

188 rstart = min(read.pos, read.pnext) + 1 # 1-based 

189 lseq = abs(read.isize) 

190 rend = rstart + lseq - 1 # end included 

191 if min_insert_size != None and ( 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was never true

192 (lseq < min_insert_size) or (lseq > max_insert_size) 

193 ): 

194 continue 

195 

196 filteredReads.add_interval(Interval(rstart, rend)) 

197 else: 

198 rstart = read.pos + 1 # 1-based 

199 lseq = ref_aln_length(read.cigar) 

200 rend = rstart + lseq - 1 # end included 

201 if min_insert_size != None and ( 201 ↛ 204line 201 didn't jump to line 204 because the condition on line 201 was never true

202 (lseq < min_insert_size) or (lseq > max_insert_size) 

203 ): 

204 continue 

205 

206 filteredReads.add_interval(Interval(rstart, rend)) 

207 with exopen(out_filepath, "a" if cnt > 0 else "w") as out: 

208 outLines = [] 

209 for pos in range(regionStart, regionEnd + 1): 

210 rstart, rend = pos - protection, pos + protection 

211 gcount, bcount = 0, 0 

212 for read in filteredReads.find(rstart, rend): 

213 if (read.start > rstart) or (read.end < rend): 

214 bcount += 1 

215 else: 

216 gcount += 1 

217 

218 outLines.append( 

219 "%s\t%d\t%d\t%d\t%d\t%d\n" 

220 % (chrom, pos, pos + 1, gcount, bcount, gcount - bcount) 

221 ) 

222 

223 for line in outLines: 

224 out.write(line) 

225 out.close() 

226 

227 

228def test_optwps_does_the_same_as_old_version_paired_end( 

229 make_test_bed_file, make_test_bam_file_paired, tmp_path 

230): 

231 from optwps import WPS 

232 

233 maker = WPS( 

234 bed_file=str(make_test_bed_file), 

235 protection_size=120, 

236 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

237 ) 

238 tmp_new_output = tmp_path / "new_wps_output.tsv" 

239 tmp_new_output = str(tmp_new_output) 

240 maker.run( 

241 bamfile=str(make_test_bam_file_paired), 

242 out_filepath=tmp_new_output, 

243 ) 

244 tmp_old_output = tmp_path / "old_wps_output.tsv" 

245 tmp_old_output = str(tmp_old_output) 

246 old_implementation( 

247 bed_file=str(make_test_bed_file), 

248 bamfile=str(make_test_bam_file_paired), 

249 out_filepath=tmp_old_output, 

250 protection_size=120, 

251 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

252 min_insert_size=None, 

253 max_insert_size=None, 

254 ) 

255 new_lines = open(tmp_new_output).readlines() 

256 old_lines = open(tmp_old_output).readlines() 

257 assert len(new_lines) == len(old_lines) 

258 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)): 

259 if nl.split("\t")[-1] != ol.split("\t")[-1]: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 raise AssertionError( 

261 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol) 

262 ) 

263 

264 

265def test_optwps_does_the_same_as_old_version_single_end( 

266 make_test_bed_file, make_test_bam_file_single, tmp_path 

267): 

268 from optwps import WPS 

269 

270 maker = WPS( 

271 bed_file=str(make_test_bed_file), 

272 protection_size=120, 

273 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

274 ) 

275 tmp_new_output = tmp_path / "new_wps_output_single.tsv" 

276 tmp_new_output = str(tmp_new_output) 

277 maker.run( 

278 bamfile=str(make_test_bam_file_single), 

279 out_filepath=tmp_new_output, 

280 ) 

281 tmp_old_output = tmp_path / "old_wps_output_single.tsv" 

282 tmp_old_output = str(tmp_old_output) 

283 old_implementation( 

284 bed_file=str(make_test_bed_file), 

285 bamfile=str(make_test_bam_file_single), 

286 out_filepath=tmp_old_output, 

287 protection_size=120, 

288 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

289 min_insert_size=None, 

290 max_insert_size=None, 

291 ) 

292 new_lines = open(tmp_new_output).readlines() 

293 old_lines = open(tmp_old_output).readlines() 

294 assert len(new_lines) == len(old_lines) 

295 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)): 

296 if nl.split("\t")[-1] != ol.split("\t")[-1]: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 raise AssertionError( 

298 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol) 

299 ) 

300 

301 

302def test_optwps_downsampling( 

303 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path 

304): 

305 from optwps import WPS 

306 

307 maker = WPS( 

308 bed_file=str(make_test_bed_file), 

309 protection_size=120, 

310 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

311 ) 

312 tmp_output = tmp_path / "wps_output_downsampled.tsv" 

313 tmp_output = str(tmp_output) 

314 maker.run( 

315 bamfile=str(make_test_bam_file_paired), 

316 out_filepath=tmp_output, 

317 downsample_ratio=0.5, 

318 ) 

319 lines = open(tmp_output).readlines() 

320 assert len(lines) > 0 # Just check that some output is produced 

321 maker.run( 

322 bamfile=str(make_test_bam_file_single), 

323 out_filepath=tmp_output, 

324 downsample_ratio=0.5, 

325 ) 

326 lines = open(tmp_output).readlines() 

327 assert len(lines) > 0 # Just check that some output is produced 

328 

329 

330def test_optwps_no_bed_file(make_test_bam_file_paired, tmp_path): 

331 from optwps import WPS 

332 

333 maker = WPS( 

334 bed_file=None, 

335 protection_size=120, 

336 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

337 ) 

338 tmp_output = tmp_path / "wps_output_no_bed.tsv" 

339 tmp_output = str(tmp_output) 

340 maker.run( 

341 bamfile=str(make_test_bam_file_paired), 

342 out_filepath=tmp_output, 

343 ) 

344 lines = open(tmp_output).readlines() 

345 assert len(lines) > 0 # Just check that some output is produced 

346 

347 

348def test_gzbedfile_handling(make_test_bed_file, make_test_bam_file_paired, tmp_path): 

349 import gzip 

350 

351 # Create a gzipped version of the bed file 

352 gz_bed_path = tmp_path / "test_regions.bed.gz" 

353 with open(make_test_bed_file, "rb") as f_in: 

354 with gzip.open(gz_bed_path, "wb") as f_out: 

355 f_out.writelines(f_in) 

356 from optwps import WPS 

357 

358 maker = WPS( 

359 bed_file=str(gz_bed_path), 

360 protection_size=120, 

361 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

362 ) 

363 tmp_output = tmp_path / "wps_output_gzbed.tsv.gz" 

364 tmp_output = str(tmp_output) 

365 maker.run( 

366 bamfile=str(make_test_bam_file_paired), 

367 out_filepath=tmp_output, 

368 ) 

369 

370 

371def test_with_minsize_maxsize( 

372 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path 

373): 

374 from optwps import WPS 

375 

376 maker = WPS( 

377 bed_file=str(make_test_bed_file), 

378 protection_size=120, 

379 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

380 min_insert_size=400, 

381 max_insert_size=800, 

382 ) 

383 tmp_output = tmp_path / "wps_output_minsize_maxsize.tsv" 

384 tmp_output = str(tmp_output) 

385 maker.run( 

386 bamfile=str(make_test_bam_file_paired), 

387 out_filepath=tmp_output, 

388 ) 

389 lines = open(tmp_output).readlines() 

390 assert len(lines) > 0 # Just check that some output is produced 

391 maker.run( 

392 bamfile=str(make_test_bam_file_single), 

393 out_filepath=tmp_output, 

394 ) 

395 lines = open(tmp_output).readlines() 

396 assert len(lines) > 0 # Just check that some output is produced 

397 

398 

399def test_printed_to_stdout( 

400 make_test_bed_file, make_test_bam_file_paired, tmp_path, capsys 

401): 

402 from optwps import WPS 

403 

404 maker = WPS( 

405 bed_file=str(make_test_bed_file), 

406 protection_size=120, 

407 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

408 ) 

409 maker.run( 

410 bamfile=str(make_test_bam_file_paired), 

411 out_filepath=None, 

412 verbose_output=True, 

413 ) 

414 captured = capsys.readouterr() 

415 lines = captured.out.strip().split("\n") 

416 assert len(lines) > 0 # Just check that some output is produced 

417 # Check that verbose output columns are present 

418 for line in lines: 

419 cols = line.split("\t") 

420 assert len(cols) == 6 # chrom, pos_start, pos_end, gcount, bcount, wps