Coverage for tests/test_optwps.py: 92%
200 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-09 23:08 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-09 23:08 +0100
1from pytest import fixture
2import pysam
5from optwps import exopen, is_soft_clipped, ref_aln_length
6from bx.intervals import Intersecter, Interval
9@fixture
10def make_test_bed_file(tmp_path):
11 bed_content = """chr1\t1000\t2000
12chr2\t1500\t2500
13chr3\t0\t100
14chr4\t50\t150
15chr5\t3000\t4000
16chrM\t200\t800
17chrX\t500\t1500
18chrUn_gl000220\t100\t300
19"""
20 bed_file = tmp_path / "test_regions.bed"
21 bed_file.write_text(bed_content)
22 return bed_file
25def _create_read(name, ref_id, pos, length, flag, mate_pos=None, isize=None):
26 """Helper function to create a properly configured read.
28 For paired-end reads, provide mate_pos and isize.
29 For single-end reads, leave mate_pos and isize as None.
30 """
31 read = pysam.AlignedSegment()
32 read.query_name = name
33 read.reference_id = ref_id
34 read.reference_start = pos
35 read.cigar = ((0, length),)
36 read.mapping_quality = 60
37 read.query_sequence = "A" * length
38 read.query_qualities = pysam.qualitystring_to_array("I" * length)
39 read.flag = flag
41 # Set mate information only for paired-end reads
42 if mate_pos is not None:
43 read.next_reference_id = ref_id
44 read.next_reference_start = mate_pos
45 read.template_length = isize if isize is not None else 0
47 return read
50@fixture
51def make_test_bam_file_paired(tmp_path, make_test_bed_file):
52 bam_path = tmp_path / "test_reads.bam"
53 header = {
54 "HD": {"VN": "1.0"},
55 "SQ": [
56 {"LN": 5000, "SN": "chr1"},
57 {"LN": 5000, "SN": "chr2"},
58 {"LN": 5000, "SN": "chrX"},
59 {"LN": 1000, "SN": "chrM"},
60 {"LN": 300, "SN": "chrUn_gl000220"},
61 {"LN": 5000, "SN": "chr3"},
62 {"LN": 5000, "SN": "chr4"},
63 {"LN": 5000, "SN": "chr5"},
64 ],
65 }
67 with pysam.AlignmentFile(bam_path, "wb", header=header) as outf:
68 for line in make_test_bed_file.read_text().strip().split("\n"):
69 chrom, start, end = line.split("\t")[:3]
70 start, end = int(start), int(end)
71 ref_id = outf.get_tid(chrom)
73 # Pair 1: Completely spans the region
74 pos1, pos2, length = max(1, start - 10), end + 5, 100
75 isize = (pos2 + length) - pos1
76 name = f"pair1_{chrom}_{start}"
77 outf.write(_create_read(name, ref_id, pos1, length, 99, pos2, isize))
78 outf.write(_create_read(name, ref_id, pos2, length, 147, pos1, -isize))
80 # Pair 2: Partially outside the region
81 pos1, pos2, length = max(1, end - 50), end + 30, 80
82 isize = (pos2 + length) - pos1
83 name = f"pair2_{chrom}_{start}"
84 outf.write(_create_read(name, ref_id, pos1, length, 99, pos2, isize))
85 outf.write(_create_read(name, ref_id, pos2, length, 147, pos1, -isize))
87 # Sort and index the BAM file
88 sorted_bam_path = str(tmp_path / "test_reads_sorted.bam")
89 pysam.sort("-o", sorted_bam_path, str(bam_path))
90 pysam.index(sorted_bam_path)
91 return sorted_bam_path
94@fixture
95def make_test_bam_file_single(tmp_path, make_test_bed_file):
96 """Create a single-end BAM file for testing."""
97 bam_path = tmp_path / "test_reads_single.bam"
98 header = {
99 "HD": {"VN": "1.0"},
100 "SQ": [
101 {"LN": 5000, "SN": "chr1"},
102 {"LN": 5000, "SN": "chr2"},
103 {"LN": 5000, "SN": "chrX"},
104 {"LN": 1000, "SN": "chrM"},
105 {"LN": 300, "SN": "chrUn_gl000220"},
106 {"LN": 5000, "SN": "chr3"},
107 {"LN": 5000, "SN": "chr4"},
108 {"LN": 5000, "SN": "chr5"},
109 ],
110 }
112 with pysam.AlignmentFile(bam_path, "wb", header=header) as outf:
113 for line in make_test_bed_file.read_text().strip().split("\n"):
114 chrom, start, end = line.split("\t")[:3]
115 start, end = int(start), int(end)
116 ref_id = outf.get_tid(chrom)
118 # Read 1: Completely spans the region (forward strand)
119 pos, length = max(1, start - 10), end - start + 20
120 outf.write(
121 _create_read(f"read1_{chrom}_{start}", ref_id, pos, length, flag=0)
122 )
124 # Read 2: Partially outside the region (reverse strand)
125 pos, length = end - 30, 60
126 outf.write(
127 _create_read(f"read2_{chrom}_{start}", ref_id, pos, length, flag=16)
128 )
130 # Sort and index the BAM file
131 sorted_bam_path = str(tmp_path / "test_reads_single_sorted.bam")
132 pysam.sort("-o", sorted_bam_path, str(bam_path))
133 pysam.index(sorted_bam_path)
134 return sorted_bam_path
137def old_implementation(
138 bed_file,
139 bamfile,
140 out_filepath,
141 protection_size,
142 valid_chroms,
143 min_insert_size,
144 max_insert_size,
145):
146 protection = protection_size // 2
147 with exopen(bed_file, "r") as infile:
148 for cnt, line in enumerate(infile):
149 chrom, start, end = line.split(
150 "\t"
151 ) # positions should be 0-based and end non-inclusive
152 chrom = chrom.replace("chr", "")
153 if chrom not in valid_chroms:
154 continue
156 regionStart, regionEnd = int(start), int(end)
158 filteredReads = Intersecter()
160 input_file = pysam.Samfile(bamfile, "rb")
161 prefix = ""
162 for tchrom in input_file.references: 162 ↛ 167line 162 didn't jump to line 167 because the loop on line 162 didn't complete
163 if tchrom.startswith("chr"): 163 ↛ 162line 163 didn't jump to line 162 because the condition on line 163 was always true
164 prefix = "chr"
165 break
167 for read in input_file.fetch(
168 prefix + chrom,
169 max(0, regionStart - protection - 1),
170 regionEnd + protection + 1,
171 ):
172 if read.is_duplicate or read.is_qcfail or read.is_unmapped: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 continue
174 if is_soft_clipped(read.cigar): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 continue
177 if read.is_paired:
178 if read.mate_is_unmapped: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 continue
180 if read.rnext != read.tid: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 continue
182 if read.is_read1 or (
183 read.is_read2
184 and read.pnext + read.qlen < regionStart - protection - 1
185 ):
186 if read.isize == 0: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 continue
188 rstart = min(read.pos, read.pnext) + 1 # 1-based
189 lseq = abs(read.isize)
190 rend = rstart + lseq - 1 # end included
191 if min_insert_size != None and ( 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was never true
192 (lseq < min_insert_size) or (lseq > max_insert_size)
193 ):
194 continue
196 filteredReads.add_interval(Interval(rstart, rend))
197 else:
198 rstart = read.pos + 1 # 1-based
199 lseq = ref_aln_length(read.cigar)
200 rend = rstart + lseq - 1 # end included
201 if min_insert_size != None and ( 201 ↛ 204line 201 didn't jump to line 204 because the condition on line 201 was never true
202 (lseq < min_insert_size) or (lseq > max_insert_size)
203 ):
204 continue
206 filteredReads.add_interval(Interval(rstart, rend))
207 with exopen(out_filepath, "a" if cnt > 0 else "w") as out:
208 outLines = []
209 for pos in range(regionStart, regionEnd + 1):
210 rstart, rend = pos - protection, pos + protection
211 gcount, bcount = 0, 0
212 for read in filteredReads.find(rstart, rend):
213 if (read.start > rstart) or (read.end < rend):
214 bcount += 1
215 else:
216 gcount += 1
218 outLines.append(
219 "%s\t%d\t%d\t%d\t%d\t%d\n"
220 % (chrom, pos, pos + 1, gcount, bcount, gcount - bcount)
221 )
223 for line in outLines:
224 out.write(line)
225 out.close()
228def test_optwps_does_the_same_as_old_version_paired_end(
229 make_test_bed_file, make_test_bam_file_paired, tmp_path
230):
231 from optwps import WPS
233 maker = WPS(
234 bed_file=str(make_test_bed_file),
235 protection_size=120,
236 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
237 )
238 tmp_new_output = tmp_path / "new_wps_output.tsv"
239 tmp_new_output = str(tmp_new_output)
240 maker.run(
241 bamfile=str(make_test_bam_file_paired),
242 out_filepath=tmp_new_output,
243 )
244 tmp_old_output = tmp_path / "old_wps_output.tsv"
245 tmp_old_output = str(tmp_old_output)
246 old_implementation(
247 bed_file=str(make_test_bed_file),
248 bamfile=str(make_test_bam_file_paired),
249 out_filepath=tmp_old_output,
250 protection_size=120,
251 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
252 min_insert_size=None,
253 max_insert_size=None,
254 )
255 new_lines = open(tmp_new_output).readlines()
256 old_lines = open(tmp_old_output).readlines()
257 assert len(new_lines) == len(old_lines)
258 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)):
259 if nl.split("\t")[-1] != ol.split("\t")[-1]: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 raise AssertionError(
261 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol)
262 )
265def test_optwps_does_the_same_as_old_version_single_end(
266 make_test_bed_file, make_test_bam_file_single, tmp_path
267):
268 from optwps import WPS
270 maker = WPS(
271 bed_file=str(make_test_bed_file),
272 protection_size=120,
273 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
274 )
275 tmp_new_output = tmp_path / "new_wps_output_single.tsv"
276 tmp_new_output = str(tmp_new_output)
277 maker.run(
278 bamfile=str(make_test_bam_file_single),
279 out_filepath=tmp_new_output,
280 )
281 tmp_old_output = tmp_path / "old_wps_output_single.tsv"
282 tmp_old_output = str(tmp_old_output)
283 old_implementation(
284 bed_file=str(make_test_bed_file),
285 bamfile=str(make_test_bam_file_single),
286 out_filepath=tmp_old_output,
287 protection_size=120,
288 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
289 min_insert_size=None,
290 max_insert_size=None,
291 )
292 new_lines = open(tmp_new_output).readlines()
293 old_lines = open(tmp_old_output).readlines()
294 assert len(new_lines) == len(old_lines)
295 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)):
296 if nl.split("\t")[-1] != ol.split("\t")[-1]: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 raise AssertionError(
298 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol)
299 )
302def test_optwps_downsampling(
303 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path
304):
305 from optwps import WPS
307 maker = WPS(
308 bed_file=str(make_test_bed_file),
309 protection_size=120,
310 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
311 )
312 tmp_output = tmp_path / "wps_output_downsampled.tsv"
313 tmp_output = str(tmp_output)
314 maker.run(
315 bamfile=str(make_test_bam_file_paired),
316 out_filepath=tmp_output,
317 downsample_ratio=0.5,
318 )
319 lines = open(tmp_output).readlines()
320 assert len(lines) > 0 # Just check that some output is produced
321 maker.run(
322 bamfile=str(make_test_bam_file_single),
323 out_filepath=tmp_output,
324 downsample_ratio=0.5,
325 )
326 lines = open(tmp_output).readlines()
327 assert len(lines) > 0 # Just check that some output is produced
330def test_optwps_no_bed_file(make_test_bam_file_paired, tmp_path):
331 from optwps import WPS
333 maker = WPS(
334 bed_file=None,
335 protection_size=120,
336 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
337 )
338 tmp_output = tmp_path / "wps_output_no_bed.tsv"
339 tmp_output = str(tmp_output)
340 maker.run(
341 bamfile=str(make_test_bam_file_paired),
342 out_filepath=tmp_output,
343 )
344 lines = open(tmp_output).readlines()
345 assert len(lines) > 0 # Just check that some output is produced
348def test_gzbedfile_handling(make_test_bed_file, make_test_bam_file_paired, tmp_path):
349 import gzip
351 # Create a gzipped version of the bed file
352 gz_bed_path = tmp_path / "test_regions.bed.gz"
353 with open(make_test_bed_file, "rb") as f_in:
354 with gzip.open(gz_bed_path, "wb") as f_out:
355 f_out.writelines(f_in)
356 from optwps import WPS
358 maker = WPS(
359 bed_file=str(gz_bed_path),
360 protection_size=120,
361 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
362 )
363 tmp_output = tmp_path / "wps_output_gzbed.tsv.gz"
364 tmp_output = str(tmp_output)
365 maker.run(
366 bamfile=str(make_test_bam_file_paired),
367 out_filepath=tmp_output,
368 )
371def test_with_minsize_maxsize(
372 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path
373):
374 from optwps import WPS
376 maker = WPS(
377 bed_file=str(make_test_bed_file),
378 protection_size=120,
379 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
380 min_insert_size=400,
381 max_insert_size=800,
382 )
383 tmp_output = tmp_path / "wps_output_minsize_maxsize.tsv"
384 tmp_output = str(tmp_output)
385 maker.run(
386 bamfile=str(make_test_bam_file_paired),
387 out_filepath=tmp_output,
388 )
389 lines = open(tmp_output).readlines()
390 assert len(lines) > 0 # Just check that some output is produced
391 maker.run(
392 bamfile=str(make_test_bam_file_single),
393 out_filepath=tmp_output,
394 )
395 lines = open(tmp_output).readlines()
396 assert len(lines) > 0 # Just check that some output is produced
399def test_printed_to_stdout(
400 make_test_bed_file, make_test_bam_file_paired, tmp_path, capsys
401):
402 from optwps import WPS
404 maker = WPS(
405 bed_file=str(make_test_bed_file),
406 protection_size=120,
407 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
408 )
409 maker.run(
410 bamfile=str(make_test_bam_file_paired),
411 out_filepath=None,
412 verbose_output=True,
413 )
414 captured = capsys.readouterr()
415 lines = captured.out.strip().split("\n")
416 assert len(lines) > 0 # Just check that some output is produced
417 # Check that verbose output columns are present
418 for line in lines:
419 cols = line.split("\t")
420 assert len(cols) == 6 # chrom, pos_start, pos_end, gcount, bcount, wps