Coverage for tests/test_optwps.py: 90%

147 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-10 19:23 +0100

1import pysam 

2 

3from optwps import exopen, is_soft_clipped, ref_aln_length 

4from bx.intervals import Intersecter, Interval 

5 

6 

7def old_implementation( 

8 bed_file, 

9 bamfile, 

10 out_filepath, 

11 protection_size, 

12 valid_chroms, 

13 min_insert_size, 

14 max_insert_size, 

15): 

16 protection = protection_size // 2 

17 with exopen(bed_file, "r") as infile: 

18 for cnt, line in enumerate(infile): 

19 chrom, start, end = line.split( 

20 "\t" 

21 ) # positions should be 0-based and end non-inclusive 

22 chrom = chrom.replace("chr", "") 

23 if chrom not in valid_chroms: 

24 continue 

25 

26 regionStart, regionEnd = int(start), int(end) 

27 

28 filteredReads = Intersecter() 

29 

30 input_file = pysam.Samfile(bamfile, "rb") 

31 prefix = "" 

32 for tchrom in input_file.references: 32 ↛ 37line 32 didn't jump to line 37 because the loop on line 32 didn't complete

33 if tchrom.startswith("chr"): 33 ↛ 32line 33 didn't jump to line 32 because the condition on line 33 was always true

34 prefix = "chr" 

35 break 

36 

37 for read in input_file.fetch( 

38 prefix + chrom, 

39 max(0, regionStart - protection - 1), 

40 regionEnd + protection + 1, 

41 ): 

42 if read.is_duplicate or read.is_qcfail or read.is_unmapped: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true

43 continue 

44 if is_soft_clipped(read.cigar): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 continue 

46 

47 if read.is_paired: 

48 if read.mate_is_unmapped: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 continue 

50 if read.rnext != read.tid: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 continue 

52 if read.is_read1 or ( 

53 read.is_read2 

54 and read.pnext + read.qlen < regionStart - protection - 1 

55 ): 

56 if read.isize == 0: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 continue 

58 rstart = min(read.pos, read.pnext) + 1 # 1-based 

59 lseq = abs(read.isize) 

60 rend = rstart + lseq - 1 # end included 

61 if min_insert_size != None and ( 61 ↛ 64line 61 didn't jump to line 64 because the condition on line 61 was never true

62 (lseq < min_insert_size) or (lseq > max_insert_size) 

63 ): 

64 continue 

65 

66 filteredReads.add_interval(Interval(rstart, rend)) 

67 else: 

68 rstart = read.pos + 1 # 1-based 

69 lseq = ref_aln_length(read.cigar) 

70 rend = rstart + lseq - 1 # end included 

71 if min_insert_size != None and ( 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was never true

72 (lseq < min_insert_size) or (lseq > max_insert_size) 

73 ): 

74 continue 

75 

76 filteredReads.add_interval(Interval(rstart, rend)) 

77 with exopen(out_filepath, "a" if cnt > 0 else "w") as out: 

78 outLines = [] 

79 for pos in range(regionStart, regionEnd + 1): 

80 rstart, rend = pos - protection, pos + protection 

81 gcount, bcount = 0, 0 

82 for read in filteredReads.find(rstart, rend): 

83 if (read.start > rstart) or (read.end < rend): 

84 bcount += 1 

85 else: 

86 gcount += 1 

87 

88 outLines.append( 

89 "%s\t%d\t%d\t%d\t%d\t%d\n" 

90 % (chrom, pos, pos + 1, gcount, bcount, gcount - bcount) 

91 ) 

92 

93 for line in outLines: 

94 out.write(line) 

95 out.close() 

96 

97 

98def test_optwps_does_the_same_as_old_version_paired_end( 

99 make_test_bed_file, make_test_bam_file_paired, tmp_path 

100): 

101 from optwps import WPS 

102 

103 maker = WPS( 

104 bed_file=str(make_test_bed_file), 

105 protection_size=120, 

106 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

107 ) 

108 tmp_new_output = tmp_path / "new_wps_output.tsv" 

109 tmp_new_output = str(tmp_new_output) 

110 maker.run( 

111 bamfile=str(make_test_bam_file_paired), 

112 out_filepath=tmp_new_output, 

113 ) 

114 tmp_old_output = tmp_path / "old_wps_output.tsv" 

115 tmp_old_output = str(tmp_old_output) 

116 old_implementation( 

117 bed_file=str(make_test_bed_file), 

118 bamfile=str(make_test_bam_file_paired), 

119 out_filepath=tmp_old_output, 

120 protection_size=120, 

121 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

122 min_insert_size=None, 

123 max_insert_size=None, 

124 ) 

125 new_lines = open(tmp_new_output).readlines() 

126 old_lines = open(tmp_old_output).readlines() 

127 assert len(new_lines) == len(old_lines) 

128 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)): 

129 if nl.split("\t")[-1] != ol.split("\t")[-1]: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 raise AssertionError( 

131 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol) 

132 ) 

133 

134 

135def test_optwps_does_the_same_as_old_version_single_end( 

136 make_test_bed_file, make_test_bam_file_single, tmp_path 

137): 

138 from optwps import WPS 

139 

140 maker = WPS( 

141 bed_file=str(make_test_bed_file), 

142 protection_size=120, 

143 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

144 ) 

145 tmp_new_output = tmp_path / "new_wps_output_single.tsv" 

146 tmp_new_output = str(tmp_new_output) 

147 maker.run( 

148 bamfile=str(make_test_bam_file_single), 

149 out_filepath=tmp_new_output, 

150 ) 

151 tmp_old_output = tmp_path / "old_wps_output_single.tsv" 

152 tmp_old_output = str(tmp_old_output) 

153 old_implementation( 

154 bed_file=str(make_test_bed_file), 

155 bamfile=str(make_test_bam_file_single), 

156 out_filepath=tmp_old_output, 

157 protection_size=120, 

158 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

159 min_insert_size=None, 

160 max_insert_size=None, 

161 ) 

162 new_lines = open(tmp_new_output).readlines() 

163 old_lines = open(tmp_old_output).readlines() 

164 assert len(new_lines) == len(old_lines) 

165 for cnt, (nl, ol) in enumerate(zip(new_lines, old_lines)): 

166 if nl.split("\t")[-1] != ol.split("\t")[-1]: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 raise AssertionError( 

168 "Mismatch in line {}:\nNew: {}\nOld: {}".format(cnt, nl, ol) 

169 ) 

170 

171 

172def test_optwps_downsampling( 

173 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path 

174): 

175 from optwps import WPS 

176 

177 maker = WPS( 

178 bed_file=str(make_test_bed_file), 

179 protection_size=120, 

180 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

181 ) 

182 tmp_output = tmp_path / "wps_output_downsampled.tsv" 

183 tmp_output = str(tmp_output) 

184 maker.run( 

185 bamfile=str(make_test_bam_file_paired), 

186 out_filepath=tmp_output, 

187 downsample_ratio=0.5, 

188 ) 

189 lines = open(tmp_output).readlines() 

190 assert len(lines) > 0 # Just check that some output is produced 

191 maker.run( 

192 bamfile=str(make_test_bam_file_single), 

193 out_filepath=tmp_output, 

194 downsample_ratio=0.5, 

195 ) 

196 lines = open(tmp_output).readlines() 

197 assert len(lines) > 0 # Just check that some output is produced 

198 

199 

200def test_optwps_no_bed_file(make_test_bam_file_paired, tmp_path): 

201 from optwps import WPS 

202 

203 maker = WPS( 

204 bed_file=None, 

205 protection_size=120, 

206 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

207 ) 

208 tmp_output = tmp_path / "wps_output_no_bed.tsv" 

209 tmp_output = str(tmp_output) 

210 maker.run( 

211 bamfile=str(make_test_bam_file_paired), 

212 out_filepath=tmp_output, 

213 ) 

214 lines = open(tmp_output).readlines() 

215 assert len(lines) > 0 # Just check that some output is produced 

216 

217 

218def test_gzbedfile_handling(make_test_bed_file, make_test_bam_file_paired, tmp_path): 

219 import gzip 

220 

221 # Create a gzipped version of the bed file 

222 gz_bed_path = tmp_path / "test_regions.bed.gz" 

223 with open(make_test_bed_file, "rb") as f_in: 

224 with gzip.open(gz_bed_path, "wb") as f_out: 

225 f_out.writelines(f_in) 

226 from optwps import WPS 

227 

228 maker = WPS( 

229 bed_file=str(gz_bed_path), 

230 protection_size=120, 

231 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

232 ) 

233 tmp_output = tmp_path / "wps_output_gzbed.tsv.gz" 

234 tmp_output = str(tmp_output) 

235 maker.run( 

236 bamfile=str(make_test_bam_file_paired), 

237 out_filepath=tmp_output, 

238 ) 

239 

240 

241def test_with_minsize_maxsize( 

242 make_test_bed_file, make_test_bam_file_paired, make_test_bam_file_single, tmp_path 

243): 

244 from optwps import WPS 

245 

246 maker = WPS( 

247 bed_file=str(make_test_bed_file), 

248 protection_size=120, 

249 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

250 min_insert_size=400, 

251 max_insert_size=800, 

252 ) 

253 tmp_output = tmp_path / "wps_output_minsize_maxsize.tsv" 

254 tmp_output = str(tmp_output) 

255 maker.run( 

256 bamfile=str(make_test_bam_file_paired), 

257 out_filepath=tmp_output, 

258 ) 

259 lines = open(tmp_output).readlines() 

260 assert len(lines) > 0 # Just check that some output is produced 

261 maker.run( 

262 bamfile=str(make_test_bam_file_single), 

263 out_filepath=tmp_output, 

264 ) 

265 lines = open(tmp_output).readlines() 

266 assert len(lines) > 0 # Just check that some output is produced 

267 

268 

269def test_with_coverage_computation( 

270 make_test_bed_file, make_test_bam_file_paired, tmp_path 

271): 

272 from optwps import WPS 

273 

274 maker = WPS( 

275 bed_file=str(make_test_bed_file), 

276 protection_size=120, 

277 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

278 ) 

279 tmp_output = tmp_path / "wps_output_with_coverage.tsv" 

280 tmp_output = str(tmp_output) 

281 maker.run( 

282 bamfile=str(make_test_bam_file_paired), 

283 out_filepath=tmp_output, 

284 compute_coverage=True, 

285 ) 

286 lines = open(tmp_output).readlines() 

287 assert len(lines) > 0 # Just check that some output is produced 

288 # Check that coverage column is present 

289 assert len(lines[0].strip().split("\t")) == 5 # chrom, start, end, coverage, wps 

290 

291 

292def test_printed_to_stdout( 

293 make_test_bed_file, make_test_bam_file_paired, tmp_path, capsys 

294): 

295 from optwps import WPS 

296 

297 maker = WPS( 

298 bed_file=str(make_test_bed_file), 

299 protection_size=120, 

300 valid_chroms=set(["1", "2", "X", "3", "4", "5"]), 

301 ) 

302 maker.run( 

303 bamfile=str(make_test_bam_file_paired), 

304 out_filepath=None, 

305 verbose_output=True, 

306 ) 

307 captured = capsys.readouterr() 

308 lines = captured.out.strip().split("\n") 

309 assert len(lines) > 0 # Just check that some output is produced 

310 # Check that verbose output columns are present 

311 for line in lines: 

312 cols = line.split("\t") 

313 assert len(cols) == 6 # chrom, pos_start, pos_end, gcount, bcount, wps