Coverage for /var/devmt/py/dbilib_0.5.0/dbilib/_dbi_base.py: 100%

63 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-09-20 12:44 +0100

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4:Purpose: This module contains the library's *base* database methods 

5 and attribute accessors, which are designed to be 

6 specialised by the database-specific modules and classes. 

7 

8:Platform: Linux/Windows | Python 3.10+ 

9:Developer: J Berendt 

10:Email: support@s3dev.uk 

11 

12:Comments: This module contains *only* methods which can safely be 

13 inherited and used by *any* of its subclasses. 

14 

15 In other words, this module should *not* contain any import 

16 statement, or uses of these imports, which if used in a 

17 database-specific module will cause a crash due to a missing 

18 library. 

19 

20 Any database-specific functionality must be contained in 

21 that module. 

22 

23:Example: 

24 

25 For class-specific usage examples, please refer to the docstring 

26 for the following classes: 

27 

28 - :class:`_DBIBase` 

29 

30""" 

31# pylint: disable=import-error 

32# pylint: disable=wrong-import-order 

33 

34from __future__ import annotations 

35 

36import pandas as pd 

37import traceback 

38import sqlalchemy as sa 

39from enum import IntEnum 

40from sqlalchemy.exc import SQLAlchemyError 

41from utils4.reporterror import reporterror 

42from utils4.user_interface import ui 

43 

44 

45class ExitCode(IntEnum): 

46 """Program exit code container class.""" 

47 

48 OK = 0 # General OK 

49 

50 # Backup codes (110-119) 

51 ERR_BKUP_TBNEX = 110 # Table does not exist 

52 ERR_BKUP_DBNEX = 111 # Backup database does not exist 

53 ERR_BKUP_CKSUM = 112 # Checksum mismatch 

54 

55 

56class SecurityWarning(Warning): 

57 """Security warning stub-class.""" 

58 

59 

60class _DBIBase: 

61 """This class holds the methods and properties which are used across 

62 all databases. Each of the database-specific constructors inherits 

63 this class for its members. 

64 

65 Note: 

66 This class is *not* designed to be interacted with directly. 

67 

68 Rather, please use the :class:`database.DBInterface` class 

69 instead, as the proper interface class has an automatic switch 

70 for database interfaces, based on the ``sqlalchemy.Engine`` 

71 object which is created from the connection string. 

72 

73 Args: 

74 connstr (str): The database-specific SQLAlchemy connection 

75 string. 

76 

77 :Example Use: 

78 

79 This low-level generalised class is designed to be inherited by 

80 the calling/wrapping class as:: 

81 

82 >>> from dbilib.database import DBInterface 

83 

84 class MyDB(DBInterface): 

85 

86 def __init__(self, connstr: str): 

87 super().__init__(connstr=('mysql+mysqlconnector://' 

88 '<user>:<pwd>@<host>:<port>/' 

89 '<db_name>')) 

90 

91 """ 

92 

93 _PREFIX = '\n[DatabaseError]:' 

94 _PREFIXW = '\n[DatabaseWarning]:' 

95 

96 def __init__(self, connstr: str): 

97 """Class initialiser.""" 

98 self._connstr = connstr 

99 self._engine = None 

100 if connstr: 

101 # Testing: Enable an instance to be created without a 

102 # connection string. 

103 self._engine = self._create_engine() 

104 

105 @property 

106 def database_name(self): 

107 """Accessor to the database name used by the :attr:`engine` object.""" 

108 return self._engine.url.database 

109 

110 @property 

111 def engine(self): 

112 """Accessor to the ``sqlalchemy.engine.base.Engine`` object.""" 

113 return self._engine 

114 

115 def execute_query(self, 

116 stmt: str, 

117 params: dict=None, 

118 *, 

119 raw: bool=True, 

120 flat: bool=False, 

121 commit: bool=True, 

122 ignore_unsafe: bool=False) -> list | pd.DataFrame | None: 

123 """Execute a query statement. 

124 

125 Important: 

126 The following are *not* allowed to be executed by this 

127 method: 

128 

129 - Statements containing multiple semi-colons (``;``). 

130 - Statements containing a comment delimiter (``--``). 

131 

132 If found, a :class:`SecurityWarning` will be raised by the 

133 :meth:`_is_dangerous` method. 

134 

135 Args: 

136 stmt (str): Statement to be executed. The parameter bindings 

137 are to be written in colon format. 

138 params (dict, optional): Parameter key/value bindings as a 

139 dictionary, if applicable. Defaults to None. 

140 raw (bool, optional): Return the data in 'raw' (tuple) 

141 format rather than as a formatted DataFrame. 

142 Defaults to True for efficiency. 

143 flat (bool, optional): Flatten the response. This is useful 

144 if the expected response is a collection of *single 

145 elements*. If True, this will return a flattened tuple of 

146 elements, rather than a list of tuples, as is the default 

147 behaviour. Note (1): If true, and the return is a list of 

148 multi-value tuples, only the first element in each tuple 

149 will be returned. Note (2): This argument should only be 

150 used if ``raw=True``. Defaults to False. 

151 commit (bool, optional): Call COMMIT after the transaction 

152 is complete. Defaults to True (for backwards 

153 compatibility). 

154 ignore_unsafe (bool, optional): Bypass the 'is dangerous' 

155 check and the run query anyway. This may be required if 

156 a script contains multiple statements. Defaults to False. 

157 

158 WARNING: **HC SVNT DRACONES** 

159 

160 If the query did not return results and the ``raw`` argument is 

161 False, an empty DataFrame containing the column names only, is 

162 returned. 

163 

164 Note: 

165 In the SQL query, the bind parameters are specified by name, 

166 using the format ``:bind_name``. The ``params`` dictionary 

167 argument must contain the associated parameter name/value 

168 bindings. 

169 

170 Warning: 

171 

172 1) Generally, whatever statement is passed into this method 

173 **will be executed**, and may have *destructive 

174 implications.* 

175 

176 2) This method contains a ``commit`` call, and the option to 

177 disable the COMMIT. 

178 

179 If a statement is passed into this method, and the user has 

180 the appropriate permissions - the change 

181 **will be committed**. 

182 

183 **... HC SVNT DRACONES.** 

184 

185 Returns: 

186 list | pd.DataFrame | None: If the ``raw`` parameter is 

187 True, a list of tuples containing values is returned. 

188 Otherwise, a ``pandas.DataFrame`` object containing the 

189 returned data is returned. 

190 

191 If this method is called with a script which does not return 

192 results, for example a CREATE script, None is returned; 

193 regardless of the value passed to the ``raw`` parameter. 

194 

195 """ 

196 # pylint: disable=line-too-long # Kept for clarity. 

197 # pylint: disable=no-else-return # Additional else and return used for clarity. 

198 # pylint: disable=no-member # The error does have a _message member. 

199 try: 

200 rtn = None 

201 # Perform a cursory 'security check.' 

202 if ignore_unsafe or not self._is_dangerous(stmt=stmt): 

203 with self._engine.connect() as conn: 

204 result = conn.execute(sa.text(stmt), params) 

205 # ???: Added for SQL Server support (v0.5.0.dev1). 

206 # Does this work for other engines? 

207 if result.returns_rows: 

208 rtn = result.fetchall() if raw else self._result_to_df__cursor(result=result) 

209 if commit: 

210 conn.commit() 

211 conn.close() 

212 except SecurityWarning: 

213 print(traceback.format_exc()) 

214 except Exception as err: 

215 if 'object does not return rows' not in err._message(): 

216 reporterror(err) 

217 return next(zip(*rtn)) if flat else rtn 

218 

219 def _create_engine(self) -> sa.engine.base.Engine: 

220 """Create a database engine using the provided environment. 

221 

222 Returns: 

223 sqlalchemy.engine.base.Engine: A sqlalchemy database engine 

224 object. 

225 

226 """ 

227 # The pool_* arguments to prevent MySQL timeout which causes 

228 # a broken pipe and lost connection errors. 

229 return sa.create_engine(url=self._connstr, 

230 poolclass=sa.pool.QueuePool, 

231 pool_size=20, 

232 pool_recycle=3600, 

233 pool_timeout=30, 

234 pool_pre_ping=True, 

235 max_overflow=0) 

236 

237 @staticmethod 

238 def _is_dangerous(stmt: str) -> bool: 

239 """Perform a dirty security check for injection attempts. 

240 

241 Args: 

242 stmt (str): SQL statement to be potentially executed. 

243 

244 Raises: 

245 SecurityWarning: If there are multiple semi-colons (``;``) 

246 in the statement, or any comment delimiters (``--``). 

247 

248 Returns: 

249 bool: False if the checks pass. 

250 

251 """ 

252 if stmt.count(';') > 1: 

253 msg = 'Multiple statements are disallowed for security reasons.' 

254 raise SecurityWarning(msg) 

255 if '--' in stmt: 

256 msg = 'Comments are not allowed in the statement for security reasons.' 

257 raise SecurityWarning(msg) 

258 return False 

259 

260 def _report_sa_error(self, msg: str, error: SQLAlchemyError): # pragma: nocover 

261 """Report SQLAlchemy error to the terminal. 

262 

263 Args: 

264 msg (str): Additional error to be displayed. This message 

265 will be automatically prefixed with '[DatabaseError]: ' 

266 error (sqlalchemy.exc.SQLAlchemyError): Caught error object 

267 from the try/except block. 

268 

269 """ 

270 msg = f'\n{self._PREFIX} {msg}' 

271 err_stmt = error.statement if hasattr(error, 'statement') else 'n/a' 

272 err_orig = str(error.orig) if hasattr(error, 'orig') else 'n/a' 

273 raw = f'- Raw: {str(error).strip()}' 

274 stmt = f'- Statement: {err_stmt}' 

275 errr = f'- Error: {err_orig}' 

276 ui.print_alert(text=msg) 

277 ui.print_alert(text=raw) 

278 ui.print_alert(text=stmt) 

279 ui.print_alert(text=errr) 

280 

281 @staticmethod 

282 def _result_to_df__cursor(result: sa.engine.cursor.CursorResult) -> pd.DataFrame: 

283 """Convert a ``CursorResult`` object to a DataFrame. 

284 

285 If the cursor did not return results, an empty DataFrame 

286 containing the column names only, is returned. 

287 

288 Args: 

289 result (sqlalchemy.engine.cursor.CursorResult): Object to 

290 be converted. 

291 

292 Returns: 

293 pd.DataFrame: A ``pandas.DataFrame`` object containing the 

294 cursor's data. 

295 

296 """ 

297 return pd.DataFrame(result, columns=result.keys()) 

298 

299 @staticmethod 

300 def _result_to_df__stored(result: object) -> pd.DataFrame: 

301 """Convert a ``MySQLCursor.stored_results`` object to a DataFrame. 

302 

303 Args: 

304 result (object): The ``cursor.stored_results()`` object from 

305 a ``sqlalchemy`` or ``mysql.connector`` procedure call. 

306 

307 Returns: 

308 pd.DataFrame: A DataFrame containing the results from the 

309 procedure call. 

310 

311 """ 

312 df = pd.DataFrame() 

313 try: 

314 # There is only one item in the iterable. 

315 # However, if the iterable is empty, a StopIteration error is raised 

316 # when using x = next(result); so a loop is used instead. 

317 for x in result: 

318 df = pd.DataFrame(data=x.fetchall(), columns=x.column_names) 

319 except Exception as err: 

320 reporterror(err) 

321 return df