Coverage for sphinxlint/cli.py: 77%

131 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-24 18:46 +0100

1import argparse 

2import enum 

3import multiprocessing 

4import os 

5import sys 

6from itertools import chain, starmap 

7 

8from sphinxlint import __version__, check_file 

9from sphinxlint.checkers import all_checkers 

10from sphinxlint.sphinxlint import CheckersOptions 

11 

12 

13class SortField(enum.Enum): 

14 """Fields available for sorting error reports""" 

15 

16 FILENAME = 0 

17 LINE = 1 

18 ERROR_TYPE = 2 

19 

20 @staticmethod 

21 def as_supported_options(): 

22 return ",".join(field.name.lower() for field in SortField) 

23 

24 

25def parse_args(argv=None): 

26 """Parse command line argument.""" 

27 if argv is None: 

28 argv = sys.argv 

29 parser = argparse.ArgumentParser(description=__doc__) 

30 

31 enabled_checkers_names = { 

32 checker.name for checker in all_checkers.values() if checker.enabled 

33 } 

34 

35 class EnableAction(argparse.Action): 

36 def __call__(self, parser, namespace, values, option_string=None): 

37 if values == "all": 

38 enabled_checkers_names.update(set(all_checkers.keys())) 

39 else: 

40 enabled_checkers_names.update(values.split(",")) 

41 

42 class DisableAction(argparse.Action): 

43 def __call__(self, parser, namespace, values, option_string=None): 

44 if values == "all": 

45 enabled_checkers_names.clear() 

46 else: 

47 enabled_checkers_names.difference_update(values.split(",")) 

48 

49 class StoreSortFieldAction(argparse.Action): 

50 def __call__(self, parser, namespace, values, option_string=None): 

51 sort_fields = [] 

52 for field_name in values.split(","): 

53 try: 

54 sort_fields.append(SortField[field_name.upper()]) 

55 except KeyError: 

56 raise ValueError( 

57 f"Unsupported sort field: {field_name}, " 

58 f"supported values are {SortField.as_supported_options()}" 

59 ) from None 

60 setattr(namespace, self.dest, sort_fields) 

61 

62 class StoreNumJobsAction(argparse.Action): 

63 def __call__(self, parser, namespace, values, option_string=None): 

64 setattr(namespace, self.dest, self.job_count(values)) 

65 

66 @staticmethod 

67 def job_count(values): 

68 if values == "auto": 

69 return os.cpu_count() 

70 return max(int(values), 1) 

71 

72 parser.add_argument( 

73 "-v", 

74 "--verbose", 

75 action="store_true", 

76 help="verbose (print all checked file names)", 

77 ) 

78 parser.add_argument( 

79 "-i", 

80 "--ignore", 

81 action="append", 

82 help="ignore subdir or file path", 

83 default=[], 

84 ) 

85 parser.add_argument( 

86 "-d", 

87 "--disable", 

88 action=DisableAction, 

89 help='comma-separated list of checks to disable. Give "all" to disable them all. ' 

90 "Can be used in conjunction with --enable (it's evaluated left-to-right). " 

91 '"--disable all --enable trailing-whitespace" can be used to enable a ' 

92 "single check.", 

93 ) 

94 parser.add_argument( 

95 "-e", 

96 "--enable", 

97 action=EnableAction, 

98 help='comma-separated list of checks to enable. Give "all" to enable them all. ' 

99 "Can be used in conjunction with --disable (it's evaluated left-to-right). " 

100 '"--enable all --disable trailing-whitespace" can be used to enable ' 

101 "all but one check.", 

102 ) 

103 parser.add_argument( 

104 "--list", 

105 action="store_true", 

106 help="List enabled checkers and exit. " 

107 "Can be used to see which checkers would be used with a given set of " 

108 "--enable and --disable options.", 

109 ) 

110 parser.add_argument( 

111 "--max-line-length", 

112 help="Maximum number of characters on a single line.", 

113 default=80, 

114 type=int, 

115 ) 

116 parser.add_argument( 

117 "-s", 

118 "--sort-by", 

119 action=StoreSortFieldAction, 

120 help="comma-separated list of fields used to sort errors by. Available " 

121 f"fields are: {SortField.as_supported_options()}", 

122 ) 

123 parser.add_argument( 

124 "-j", 

125 "--jobs", 

126 metavar="N", 

127 action=StoreNumJobsAction, 

128 help="Run in parallel with N processes. Defaults to 'auto', " 

129 "which sets N to the number of logical CPUs. " 

130 "Values <= 1 are all considered 1.", 

131 default=StoreNumJobsAction.job_count("auto"), 

132 ) 

133 parser.add_argument( 

134 "-V", "--version", action="version", version=f"%(prog)s {__version__}" 

135 ) 

136 

137 parser.add_argument("paths", default=".", nargs="*") 

138 args = parser.parse_args(argv[1:]) 

139 try: 

140 enabled_checkers = {all_checkers[name] for name in enabled_checkers_names} 

141 except KeyError as err: 

142 print(f"Unknown checker: {err.args[0]}.", file=sys.stderr) 

143 sys.exit(2) 

144 return enabled_checkers, args 

145 

146 

147def walk(path, ignore_list): 

148 """Wrapper around os.walk with an ignore list. 

149 

150 It also allows giving a file, thus yielding just that file. 

151 """ 

152 if os.path.isfile(path): 

153 if path in ignore_list: 

154 return 

155 yield path if path[:2] != "./" else path[2:] 

156 return 

157 for root, dirs, files in os.walk(path): 

158 # ignore subdirs in ignore list 

159 if any(ignore in root for ignore in ignore_list): 

160 del dirs[:] 

161 continue 

162 for file in files: 

163 file = os.path.join(root, file) 

164 # ignore files in ignore list 

165 if any(ignore in file for ignore in ignore_list): 

166 continue 

167 yield file if file[:2] != "./" else file[2:] 

168 

169 

170def _check_file(todo): 

171 """Wrapper to call check_file with arguments given by 

172 multiprocessing.imap_unordered.""" 

173 return check_file(*todo) 

174 

175 

176def sort_errors(results, sorted_by): 

177 """Flattens and potentially sorts errors based on user prefernces""" 

178 if not sorted_by: 

179 for results in results: 

180 yield from results 

181 return 

182 errors = list(error for errors in results for error in errors) 

183 # sorting is stable in python, so we can sort in reverse order to get the 

184 # ordering specified by the user 

185 for sort_field in reversed(sorted_by): 

186 if sort_field == SortField.ERROR_TYPE: 

187 errors.sort(key=lambda error: error.checker_name) 

188 elif sort_field == SortField.FILENAME: 

189 errors.sort(key=lambda error: error.filename) 

190 elif sort_field == SortField.LINE: 

191 errors.sort(key=lambda error: error.line_no) 

192 yield from errors 

193 

194 

195def print_errors(errors): 

196 """Print errors (or a message if nothing is to be printed).""" 

197 qty = 0 

198 for error in errors: 

199 print(error, file=sys.stderr) 

200 qty += 1 

201 if qty == 0: 

202 print("No problems found.") 

203 return qty 

204 

205 

206def main(argv=None): 

207 enabled_checkers, args = parse_args(argv) 

208 options = CheckersOptions.from_argparse(args) 

209 if args.list: 

210 if not enabled_checkers: 

211 print("No checkers selected.") 

212 return 0 

213 print(f"{len(enabled_checkers)} checkers selected:") 

214 for check in sorted(enabled_checkers, key=lambda fct: fct.name): 

215 if args.verbose: 

216 print(f"- {check.name}: {check.__doc__}") 

217 else: 

218 print(f"- {check.name}: {check.__doc__.splitlines()[0]}") 

219 if not args.verbose: 

220 print("\n(Use `--list --verbose` to know more about each check)") 

221 return 0 

222 

223 for path in args.paths: 

224 if not os.path.exists(path): 

225 print(f"Error: path {path} does not exist", file=sys.stderr) 

226 return 2 

227 

228 todo = [ 

229 (path, enabled_checkers, options) 

230 for path in chain.from_iterable(walk(path, args.ignore) for path in args.paths) 

231 ] 

232 

233 if args.jobs == 1 or len(todo) < 8: 

234 count = print_errors(sort_errors(starmap(check_file, todo), args.sort_by)) 

235 else: 

236 with multiprocessing.Pool(processes=args.jobs) as pool: 

237 count = print_errors( 

238 sort_errors(pool.imap_unordered(_check_file, todo), args.sort_by) 

239 ) 

240 pool.close() 

241 pool.join() 

242 

243 return int(bool(count))