Coverage for savcfg / keyvault.py: 97%
34 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-10 15:30 +0100
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-10 15:30 +0100
1from azure.identity import DefaultAzureCredential
2from azure.keyvault.secrets import SecretClient
4from savcfg.read import VarsReader
5from savcfg.edit import VarsEditor
8class KeyVaultVars(VarsReader, VarsEditor):
9 """Implementation of keys readers using Azure KeyVault"""
11 def __init__(
12 self,
13 kv_name: str,
14 credential: DefaultAzureCredential,
15 word_separator: str = "_",
16 uppercase: bool = False,
17 ):
18 url = f"https://{kv_name}.vault.azure.net/"
19 self._secrets = SecretClient(url, credential)
20 self._vars = []
21 self._sep = word_separator
22 self._upper = uppercase
24 def _normalize(self, key: str) -> str:
25 if self._upper:
26 key = key.upper()
27 return key.replace("--", ".").replace("-", self._sep)
29 def _denormalize(self, key: str) -> str:
30 if self._upper:
31 key = key.lower()
32 return key.replace(".", "--").replace(self._sep, "-")
34 def list_vars(self) -> list[str]:
35 if not self._vars:
36 self._vars = sorted(
37 [
38 self._normalize(key.name)
39 for key in self._secrets.list_properties_of_secrets()
40 if key is not None and key.name is not None
41 ],
42 key=lambda x: x,
43 )
45 return self._vars
47 def read(self, key: str) -> str:
48 if not self._vars:
49 self.list_vars()
51 if key not in self._vars:
52 return ""
54 value = self._secrets.get_secret(self._denormalize(key)).value
55 if not value:
56 return ""
58 return value
60 def write(self, key: str, value: str):
61 self._secrets.set_secret(self._denormalize(key), value)