Coverage for /var/devmt/py/dbilib_0.5.0/dbilib/_dbi_mysql.py: 100%
64 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-09-20 12:44 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-09-20 12:44 +0100
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4:Purpose: This module contains the library's *MySQL* database methods
5 and attribute accessors; which are a specialised version of
6 the :class:`_dbi_base._DBIBase` class methods.
8:Platform: Linux/Windows | Python 3.10+
9:Developer: J Berendt
10:Email: support@s3dev.uk
12:Comments: n/a
14:Example:
16 For class-specific usage examples, please refer to the docstring
17 for the following classes:
19 - :class:`_DBIMySQL`
21"""
22# pylint: disable=wrong-import-order
23# Silence the spurious IDE-based error.
24# pylint: disable=import-error
26import pandas as pd
27import warnings
28from mysql.connector.errors import IntegrityError
29from sqlalchemy.exc import SQLAlchemyError
30from utils4.reporterror import reporterror
31from utils4.user_interface import ui
32# locals
33try:
34 from ._dbi_base import _DBIBase
35except ImportError:
36 from _dbi_base import _DBIBase
39class _DBIMySQL(_DBIBase):
40 """This *private* class holds the methods and properties which are
41 used for accessing MySQL-like databases, including MariaDB.
43 Note:
44 This class is *not* designed to be interacted with directly.
46 Rather, please use the :class:`database.DBInterface` class
47 instead, as the proper interface class has an automatic switch
48 for database interfaces, based on the ``sqlalchemy.Engine``
49 object which is created from the connection string.
51 Args:
52 connstr (str): The database-specific SQLAlchemy connection
53 string.
55 :Example Use:
57 This low-level generalised class is designed to be inherited by
58 the calling/wrapping class as::
60 >>> from dbilib.database import DBInterface
62 class MyDB(DBInterface):
64 def __init__(self, connstr: str):
65 super().__init__(connstr=('mysql+mysqlconnector://'
66 '<user>:<pwd>@<host>:<port>/'
67 '<db_name>'))
69 """
71 # The __init__ method is implemented in the parent class.
73 def call_procedure(self,
74 proc: str,
75 params: list | tuple = None,
76 return_status: bool=False) -> pd.DataFrame | tuple[pd.DataFrame | bool]:
77 """Call a stored procedure, and return as a DataFrame.
79 Args:
80 proc (str): Name of the stored procedure to call.
81 params (list | tuple, optional): A list (or tuple) of
82 parameters to pass into the procedure. Defaults to None.
83 return_status (bool, optional): Return the method's success
84 status. Defaults to False.
86 Returns:
87 pd.DataFrame | tuple[pd.DataFrame | bool]:
88 If the ``return_status`` argument is True, a tuple of the
89 data and the method's return status is returned as::
91 (df, status)
93 Otherwise, only the data is returned, as a pd.DataFrame.
95 """
96 warnings.simplefilter('ignore')
97 df = pd.DataFrame()
98 success = False
99 try:
100 # Use a context manager in an attempt to alleviate the
101 # '2055 Lost Connection' and System Error 32 BrokenPipeError.
102 with self.engine.connect() as conn:
103 cur = conn.connection.cursor(buffered=True)
104 cur.callproc(proc, params)
105 result = cur.stored_results()
106 conn.connection.connection.commit()
107 cur.close()
108 df = self._result_to_df__stored(result=result)
109 success = not df.empty
110 except SQLAlchemyError as err:
111 msg = f'Error occurred while running the USP: {proc}.'
112 self._report_sa_error(msg=msg, error=err)
113 except Exception as err:
114 reporterror(error=err)
115 return (df, success) if return_status else df
117 def call_procedure_update(self,
118 proc: str,
119 params: list=None,
120 return_id: bool=False) -> bool | tuple:
121 """Call an *update* or *insert* stored procedure.
123 Note:
124 Results are *not* returned from this call, only a boolean
125 status flag and the optional last row ID.
127 If results are desired, please use the
128 :meth:`~call_procedure` method.
130 Args:
131 proc (str): Name of the stored procedure to call.
132 params (list, optional): A list of parameters to pass into
133 the USP. Defaults to None.
134 return_id (bool, optional): Return the ID of the last
135 inserted row. Defaults to False.
137 Returns:
138 bool | tuple: If ``return_id`` is False, True is
139 returned if the procedure completed successfully, otherwise
140 False. If ``return_id`` is True, a tuple containing the
141 ID of the last inserted row and the execution success flag
142 are returned as::
144 (id, success_flag)
146 """
147 try:
148 rowid = None
149 success = False
150 # Use a context manager in an attempt to alleviate the
151 # '2055 Lost Connection' and System Error 32 BrokenPipeError.
152 with self.engine.connect() as conn:
153 cur = conn.connection.cursor()
154 cur.callproc(proc, params)
155 conn.connection.connection.commit()
156 if return_id:
157 # The cur.lastrowid is zero as the mysql_insert_id()
158 # function call applied to a CALL and not the statement
159 # within the procedure. Therefore, it must be manually
160 # obtained here:
161 cur.execute('SELECT LAST_INSERT_ID()')
162 rowid = cur.fetchone()[0]
163 cur.close()
164 success = True
165 except IntegrityError as ierr:
166 # Duplicate entry: errno = 1062
167 msg = f'{self._PREFIX} {ierr}'
168 ui.print_alert(text=msg)
169 except Exception as err:
170 reporterror(err)
171 return (rowid, success) if return_id else success
173 def call_procedure_update_many(self, *args, proc: str, iterable: list | tuple) -> bool:
174 r"""Call an *update* or *insert* stored procedure for an iterable.
176 Note:
177 The arguments are passed into the USP in the following order:
179 \*args, iterable_item
181 Ensure the USP is designed to accept the iterable item as
182 the *last* parameter.
184 Args:
185 *args (str | int | float): Positional arguments to be
186 passed into the USP, in front of each iterable item.
187 Note: The parameters are passed into the USP in the
188 order received, followed by the iterable item.
189 proc (str): Name of the stored procedure to call.
190 iterable (list | tuple): List of items to be loaded into
191 the database.
193 Returns:
194 bool: True if the update was successful, otherwise False.
196 """
197 try:
198 success = False
199 with self.engine.connect() as conn:
200 cur = conn.connection.cursor()
201 for i in iterable:
202 cur.callproc(proc, [*args, i])
203 conn.connection.connection.commit()
204 cur.close()
205 success = True
206 except Exception as err:
207 reporterror(err)
208 return success
210 def call_procedure_update_raw(self, proc: str, params: list=None):
211 """Call an *update* or *insert* stored procedure, without error
212 handling.
214 .. warning::
215 This method is **unprotected**, perhaps use
216 :meth:`~call_procedure_update` instead.
218 This 'raw' method *does not* contain an error handler. It is
219 (by design) the responsibility of the caller to contain and
220 control the errors.
222 The purpose of this raw method is to enable the caller method to
223 contain and control the errors which might be generated from a
224 USP call, for example a **duplicate key** error.
226 Args:
227 proc (str): Name of the stored procedure to call.
228 params (list, optional): A list of parameters to pass into
229 the USP. Defaults to None.
231 """
232 with self._engine.connect() as conn:
233 cur = conn.connection.cursor(buffered=True)
234 cur.callproc(proc, params)
235 conn.connection.connection.commit()
236 cur.close()
238 def table_exists(self, table_name: str, verbose: bool=False) -> bool:
239 """Using the ``engine`` object, test if the given table exists.
241 Args:
242 table_name (str): Name of the table to test.
243 verbose (bool, optional): Print a message if the table does
244 not exist. Defaults to False.
246 Returns:
247 bool: True if the given table exists, otherwise False.
249 """
250 params = {'schema': self._engine.url.database,
251 'table_name': table_name}
252 stmt = ('select count(*) from information_schema.tables '
253 'where table_schema = :schema '
254 'and table_name = :table_name')
255 exists = bool(self.execute_query(stmt, params=params, raw=True)[0][0])
256 if (not exists) & verbose:
257 msg = f'Table does not exist: {self._engine.url.database}.{table_name}'
258 ui.print_warning(text=msg)
259 return exists