Coverage for tests/test_optwps.py: 90%
147 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-10 19:23 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-10 19:23 +0100
1import pysam
3from optwps import exopen, is_soft_clipped, ref_aln_length
4from bx.intervals import Intersecter, Interval
7def old_implementation(
8 bed_file,
9 bamfile,
10 out_filepath,
11 protection_size,
12 valid_chroms,
13 min_insert_size,
14 max_insert_size,
15):
16 protection = protection_size // 2
17 with exopen(bed_file, "r") as infile:
18 for cnt, line in enumerate(infile):
19 chrom, start, end = line.split(
20 "\t"
21 ) # positions should be 0-based and end non-inclusive
22 chrom = chrom.replace("chr", "")
23 if chrom not in valid_chroms:
24 continue
26 regionStart, regionEnd = int(start), int(end)
28 filteredReads = Intersecter()
30 input_file = pysam.Samfile(bamfile, "rb")
31 prefix = ""
32 for tchrom in input_file.references: 32 ↛ 37line 32 didn't jump to line 37 because the loop on line 32 didn't complete
33 if tchrom.startswith("chr"): 33 ↛ 32line 33 didn't jump to line 32 because the condition on line 33 was always true
34 prefix = "chr"
35 break
37 for read in input_file.fetch(
38 prefix + chrom,
39 max(0, regionStart - protection - 1),
40 regionEnd + protection + 1,
41 ):
42 if read.is_duplicate or read.is_qcfail or read.is_unmapped: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true
43 continue
44 if is_soft_clipped(read.cigar): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 continue
47 if read.is_paired:
48 if read.mate_is_unmapped: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 continue
50 if read.rnext != read.tid: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true
51 continue
52 if read.is_read1 or (
53 read.is_read2
54 and read.pnext + read.qlen < regionStart - protection - 1
55 ):
56 if read.isize == 0: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 continue
58 rstart = min(read.pos, read.pnext) + 1 # 1-based
59 lseq = abs(read.isize)
60 rend = rstart + lseq - 1 # end included
61 if min_insert_size != None and ( 61 ↛ 64line 61 didn't jump to line 64 because the condition on line 61 was never true
62 (lseq < min_insert_size) or (lseq > max_insert_size)
63 ):
64 continue
66 filteredReads.add_interval(Interval(rstart, rend))
67 else:
68 rstart = read.pos + 1 # 1-based
69 lseq = ref_aln_length(read.cigar)
70 rend = rstart + lseq - 1 # end included
71 if min_insert_size != None and ( 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was never true
72 (lseq < min_insert_size) or (lseq > max_insert_size)
73 ):
74 continue
76 filteredReads.add_interval(Interval(rstart, rend))
77 with exopen(out_filepath, "a" if cnt > 0 else "w") as out:
78 outLines = []
79 for pos in range(regionStart, regionEnd + 1):
80 rstart, rend = pos - protection, pos + protection
81 gcount, bcount = 0, 0
82 for read in filteredReads.find(rstart, rend):
83 if (read.start > rstart) or (read.end < rend):
84 bcount += 1
85 else:
86 gcount += 1
88 outLines.append(
89 "%s\t%d\t%d\t%d\t%d\t%d\n"
90 % (chrom, pos, pos + 1, gcount, bcount, gcount - bcount)
91 )
93 for line in outLines:
94 out.write(line)
95 out.close()
98def test_optwps_does_the_same_as_old_version_paired_end(
99 make_test_bed_file, make_test_bam_file_paired, tmp_path
100):
101 from optwps import WPS
103 maker = WPS(
104 bed_file=str(make_test_bed_file),
105 protection_size=120,
106 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
107 )
108 tmp_new_output = tmp_path / "new_wps_output.tsv"
109 tmp_new_output = str(tmp_new_output)
110 maker.run(
111 bamfile=str(make_test_bam_file_paired),
112 out_filepath=tmp_new_output,
113 )
114 tmp_old_output = tmp_path / "old_wps_output.tsv"
115 tmp_old_output = str(tmp_old_output)
116 old_implementation(
117 bed_file=str(make_test_bed_file),
118 bamfile=str(make_test_bam_file_paired),
119 out_filepath=tmp_old_output,
120 protection_size=120,
121 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
122 min_insert_size=None,
123 max_insert_size=None,
124 )
125 new_lines = open(tmp_new_output).readlines()
126 old_lines = open(tmp_old_output).readlines()
127 assert len(new_lines) == len(old_lines)
128 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)):
129 if nl.split("\t")[-1] != ol.split("\t")[-1]: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 raise AssertionError(
131 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol)
132 )
135def test_optwps_does_the_same_as_old_version_single_end(
136 make_test_bed_file, make_test_bam_file_single, tmp_path
137):
138 from optwps import WPS
140 maker = WPS(
141 bed_file=str(make_test_bed_file),
142 protection_size=120,
143 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
144 )
145 tmp_new_output = tmp_path / "new_wps_output_single.tsv"
146 tmp_new_output = str(tmp_new_output)
147 maker.run(
148 bamfile=str(make_test_bam_file_single),
149 out_filepath=tmp_new_output,
150 )
151 tmp_old_output = tmp_path / "old_wps_output_single.tsv"
152 tmp_old_output = str(tmp_old_output)
153 old_implementation(
154 bed_file=str(make_test_bed_file),
155 bamfile=str(make_test_bam_file_single),
156 out_filepath=tmp_old_output,
157 protection_size=120,
158 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
159 min_insert_size=None,
160 max_insert_size=None,
161 )
162 new_lines = open(tmp_new_output).readlines()
163 old_lines = open(tmp_old_output).readlines()
164 assert len(new_lines) == len(old_lines)
165 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)):
166 if nl.split("\t")[-1] != ol.split("\t")[-1]: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 raise AssertionError(
168 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol)
169 )
172def test_optwps_downsampling(
173 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path
174):
175 from optwps import WPS
177 maker = WPS(
178 bed_file=str(make_test_bed_file),
179 protection_size=120,
180 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
181 )
182 tmp_output = tmp_path / "wps_output_downsampled.tsv"
183 tmp_output = str(tmp_output)
184 maker.run(
185 bamfile=str(make_test_bam_file_paired),
186 out_filepath=tmp_output,
187 downsample_ratio=0.5,
188 )
189 lines = open(tmp_output).readlines()
190 assert len(lines) > 0 # Just check that some output is produced
191 maker.run(
192 bamfile=str(make_test_bam_file_single),
193 out_filepath=tmp_output,
194 downsample_ratio=0.5,
195 )
196 lines = open(tmp_output).readlines()
197 assert len(lines) > 0 # Just check that some output is produced
200def test_optwps_no_bed_file(make_test_bam_file_paired, tmp_path):
201 from optwps import WPS
203 maker = WPS(
204 bed_file=None,
205 protection_size=120,
206 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
207 )
208 tmp_output = tmp_path / "wps_output_no_bed.tsv"
209 tmp_output = str(tmp_output)
210 maker.run(
211 bamfile=str(make_test_bam_file_paired),
212 out_filepath=tmp_output,
213 )
214 lines = open(tmp_output).readlines()
215 assert len(lines) > 0 # Just check that some output is produced
218def test_gzbedfile_handling(make_test_bed_file, make_test_bam_file_paired, tmp_path):
219 import gzip
221 # Create a gzipped version of the bed file
222 gz_bed_path = tmp_path / "test_regions.bed.gz"
223 with open(make_test_bed_file, "rb") as f_in:
224 with gzip.open(gz_bed_path, "wb") as f_out:
225 f_out.writelines(f_in)
226 from optwps import WPS
228 maker = WPS(
229 bed_file=str(gz_bed_path),
230 protection_size=120,
231 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
232 )
233 tmp_output = tmp_path / "wps_output_gzbed.tsv.gz"
234 tmp_output = str(tmp_output)
235 maker.run(
236 bamfile=str(make_test_bam_file_paired),
237 out_filepath=tmp_output,
238 )
241def test_with_minsize_maxsize(
242 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path
243):
244 from optwps import WPS
246 maker = WPS(
247 bed_file=str(make_test_bed_file),
248 protection_size=120,
249 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
250 min_insert_size=400,
251 max_insert_size=800,
252 )
253 tmp_output = tmp_path / "wps_output_minsize_maxsize.tsv"
254 tmp_output = str(tmp_output)
255 maker.run(
256 bamfile=str(make_test_bam_file_paired),
257 out_filepath=tmp_output,
258 )
259 lines = open(tmp_output).readlines()
260 assert len(lines) > 0 # Just check that some output is produced
261 maker.run(
262 bamfile=str(make_test_bam_file_single),
263 out_filepath=tmp_output,
264 )
265 lines = open(tmp_output).readlines()
266 assert len(lines) > 0 # Just check that some output is produced
269def test_with_coverage_computation(
270 make_test_bed_file, make_test_bam_file_paired, tmp_path
271):
272 from optwps import WPS
274 maker = WPS(
275 bed_file=str(make_test_bed_file),
276 protection_size=120,
277 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
278 )
279 tmp_output = tmp_path / "wps_output_with_coverage.tsv"
280 tmp_output = str(tmp_output)
281 maker.run(
282 bamfile=str(make_test_bam_file_paired),
283 out_filepath=tmp_output,
284 compute_coverage=True,
285 )
286 lines = open(tmp_output).readlines()
287 assert len(lines) > 0 # Just check that some output is produced
288 # Check that coverage column is present
289 assert len(lines[0].strip().split("\t")) == 5 # chrom, start, end, coverage, wps
292def test_printed_to_stdout(
293 make_test_bed_file, make_test_bam_file_paired, tmp_path, capsys
294):
295 from optwps import WPS
297 maker = WPS(
298 bed_file=str(make_test_bed_file),
299 protection_size=120,
300 valid_chroms=set(["1", "2", "X", "3", "4", "5"]),
301 )
302 maker.run(
303 bamfile=str(make_test_bam_file_paired),
304 out_filepath=None,
305 verbose_output=True,
306 )
307 captured = capsys.readouterr()
308 lines = captured.out.strip().split("\n")
309 assert len(lines) > 0 # Just check that some output is produced
310 # Check that verbose output columns are present
311 for line in lines:
312 cols = line.split("\t")
313 assert len(cols) == 6 # chrom, pos_start, pos_end, gcount, bcount, wps