Coverage for src\derivepassphrase\sequin.py: 100.000%

109 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 12:17 +0200

1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> 

2# 

3# SPDX-License-Identifier: Zlib 

4 

5"""A Python reimplementation of James Coglan's "sequin" Node.js module. 

6 

7James Coglan's "sequin" Node.js module provides a pseudorandom number 

8generator (using rejection sampling on a stream of input numbers) that 

9attempts to minimize the amount of information it throws away: 

10(non-degenerate) rejected samples are fed into a stream of higher-order 

11numbers from which the next random number generation request will be 

12served. The sequin module is used in Coglan's "vault" module (a 

13deterministic, stateless password manager that recomputes passwords 

14instead of storing them), and this reimplementation is used for 

15a similar purpose. 

16 

17The main API is the [`Sequin`][] class, which is thoroughly documented. 

18 

19""" 

20 

21# ruff: noqa: RUF002,RUF003 

22 

23from __future__ import annotations 

24 

25import collections 

26import threading 

27from typing import TYPE_CHECKING 

28 

29from typing_extensions import assert_type 

30 

31if TYPE_CHECKING: 

32 from collections.abc import Iterator, Sequence 

33 

34__all__ = ('Sequin', 'SequinExhaustedError') 

35 

36 

37class Sequin: 

38 """Generate pseudorandom non-negative numbers in different ranges. 

39 

40 Given a (presumed high-quality) uniformly random sequence of input 

41 bits, generate pseudorandom non-negative integers in a certain range 

42 on each call of the `generate` method. (It is permissible to 

43 specify a different range per call to `generate`; this is the main 

44 use case.) We use a modified version of rejection sampling, where 

45 rejected values are stored in "rejection queues" if possible, and 

46 these rejection queues re-seed the next round of rejection sampling. 

47 

48 This is a Python reimplementation of James Coglan's [Node.js sequin 

49 module][JS_SEQUIN], as introduced in [his blog post][BLOG_POST]. It 

50 uses a [technique by Christian Lawson-Perfect][SEQUIN_TECHNIQUE]. 

51 I do not know why the original module is called "sequin"; I presume 

52 it to be a pun on "sequence". 

53 

54 [JS_SEQUIN]: https://www.npmjs.com/package/sequin 

55 [BLOG_POST]: https://blog.jcoglan.com/2012/07/16/designing-vaults-generator-algorithm/ 

56 [SEQUIN_TECHNIQUE]: https://checkmyworking.com/2012/06/converting-a-stream-of-binary-digits-to-a-stream-of-base-n-digits/ 

57 

58 """ 

59 

60 def __init__( 

61 self, 

62 sequence: str | bytes | bytearray | Sequence[int], 

63 /, 

64 *, 

65 is_bitstring: bool = False, 

66 ) -> None: 

67 """Initialize the Sequin. 

68 

69 Args: 

70 sequence: 

71 A sequence of bits, or things convertible to bits, to 

72 seed the pseudorandom number generator. Byte and text 

73 strings are converted to 8-bit integer sequences. 

74 (Conversion will fail if the text string contains 

75 non-ISO-8859-1 characters.) The numbers are then 

76 converted to bits. 

77 is_bitstring: 

78 If true, treat the input as a bitstring. By default, 

79 the input is treated as a string of 8-bit integers, from 

80 which the individual bits must still be extracted. 

81 

82 Raises: 

83 ValueError: 

84 The sequence contains values outside the permissible 

85 range. 

86 

87 """ 

88 msg = 'sequence item out of range' 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

89 

90 def uint8_to_bits(value: int) -> Iterator[int]: 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

91 """Yield individual bits of an 8-bit number, MSB first.""" 

92 for i in (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01): 1acpwxqyrsdzABCREnFGHIoJKLtMuevfghijklbm

93 yield 1 if value | i == value else 0 1acpwxqyrsdzABCREnFGHIoJKLtMuevfghijklbm

94 

95 if isinstance(sequence, str): 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

96 try: 1SR

97 sequence = tuple(sequence.encode('iso-8859-1')) 1SR

98 except UnicodeError as e: 1S

99 raise ValueError(msg) from e 1S

100 else: 

101 sequence = tuple(sequence) 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

102 assert_type(sequence, tuple[int, ...]) 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

103 consistency_lock = threading.RLock() 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

104 bitstream: collections.deque[int] = collections.deque() 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

105 for num in sequence: 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

106 if num not in range(2 if is_bitstring else 256): 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

107 raise ValueError(msg) 1S

108 if is_bitstring: 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm

109 bitstream.append(num) 1aPQSRDNO

110 else: 

111 bitstream.extend(uint8_to_bits(num)) 1acpwxqyrsdzABCREnFGHIoJKLtMuevfghijklbm

112 

113 with consistency_lock: 1acpwxqyrsdzABCPQRDNOEnFGHIoJKLtMuevfghijklbm

114 self.consistency_lock = consistency_lock 1acpwxqyrsdzABCPQRDNOEnFGHIoJKLtMuevfghijklbm

115 self.bases = {2: bitstream} 1acpwxqyrsdzABCPQRDNOEnFGHIoJKLtMuevfghijklbm

116 

117 def _all_or_nothing_shift( 

118 self, count: int, /, *, base: int = 2 

119 ) -> Sequence[int]: 

120 """Shift and return items if and only if there are enough. 

121 

122 Args: 

123 count: Number of items to shift/consume. 

124 base: Use the base `base` sequence. 

125 

126 Returns: 

127 If there are sufficient items in the sequence left, then 

128 consume them from the sequence and return them. Otherwise, 

129 consume nothing, and return nothing. 

130 

131 Info: Thread-safety 

132 This call is thread-safe. 

133 

134 Notes: 

135 We currently remove now-empty sequences from the registry of 

136 sequences. 

137 

138 Examples: 

139 >>> seq = Sequin([1, 0, 1, 0, 0, 1, 0, 0, 0, 1], is_bitstring=True) 

140 >>> seq.bases 

141 {2: deque([1, 0, 1, 0, 0, 1, 0, 0, 0, 1])} 

142 >>> seq._all_or_nothing_shift(3) 

143 (1, 0, 1) 

144 >>> seq._all_or_nothing_shift(3) 

145 (0, 0, 1) 

146 >>> seq.bases[2] 

147 deque([0, 0, 0, 1]) 

148 >>> seq._all_or_nothing_shift(5) 

149 () 

150 >>> seq.bases[2] 

151 deque([0, 0, 0, 1]) 

152 >>> seq._all_or_nothing_shift(4) 

153 (0, 0, 0, 1) 

154 >>> 2 in seq.bases # now-empty sequences are removed 

155 False 

156 

157 """ 

158 with self.consistency_lock: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

159 try: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

160 seq = self.bases[base] 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

161 except KeyError: 1acdNOnoeghijklbm

162 return () 1acdNOnoeghijklbm

163 stash: collections.deque[int] = collections.deque() 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

164 try: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

165 for _ in range(count): 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

166 stash.append(seq.popleft()) 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

167 except IndexError: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

168 seq.extendleft(reversed(stash)) 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

169 return () 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

170 # Clean up queues. 

171 if not seq: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

172 del self.bases[base] 1acpwxqyrsdzABCDNOEnFGHoJKLtuevfghijklbm

173 return tuple(stash) 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm

174 

175 @staticmethod 

176 def _big_endian_number(digits: Sequence[int], /, *, base: int = 2) -> int: 

177 """Evaluate the given integer sequence as a big endian number. 

178 

179 Args: 

180 digits: A sequence of integers to evaluate. 

181 base: The number base to evaluate those integers in. 

182 

183 Returns: 

184 The number value of the integer sequence. 

185 

186 Raises: 

187 ValueError: `base` is an invalid base. 

188 ValueError: Not all integers are valid base `base` digits. 

189 

190 Info: Thread-safety 

191 This call is thread-safe. 

192 

193 Examples: 

194 >>> Sequin._big_endian_number([1, 2, 3, 4, 5, 6, 7, 8], base=10) 

195 12345678 

196 >>> Sequin._big_endian_number([1, 2, 3, 4, 5, 6, 7, 8], base=100) 

197 102030405060708 

198 >>> Sequin._big_endian_number([0, 0, 0, 0, 1, 4, 9, 7], base=10) 

199 1497 

200 >>> Sequin._big_endian_number([1, 0, 0, 1, 0, 0, 0, 0], base=2) 

201 144 

202 >>> Sequin._big_endian_number([1, 7, 5, 5], base=8) == 0o1755 

203 True 

204 

205 """ # noqa: DOC501 

206 if base < 2: # noqa: PLR2004 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

207 msg = f'invalid base: {base!r}' 1T

208 raise ValueError(msg) 1T

209 ret = 0 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

210 allowed_range = range(base) 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

211 n = len(digits) 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

212 for i in range(n): 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

213 i2 = (n - 1) - i 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

214 x = digits[i] 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

215 if not isinstance(x, int): 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

216 msg = f'not an integer: {x!r}' 1T

217 raise TypeError(msg) 1T

218 if x not in allowed_range: 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm

219 msg = f'invalid base {base!r} digit: {x!r}' 1T

220 raise ValueError(msg) 1T

221 ret += (base**i2) * x 1acpwxqyrsdzABCUDNEnFGHIoJKLtMuevfghijklbm

222 return ret 1acpwxqyrsdzABCUDNEnFGHIoJKLtMuevfghijklbm

223 

224 def generate(self, n: int, /) -> int: 

225 """Generate a base `n` non-negative integer. 

226 

227 We attempt to generate a value using rejection sampling. If the 

228 generated sample is outside the desired range (i.e., is 

229 rejected), then attempt to reuse the sample by seeding 

230 a "higher-order" input sequence of uniformly random numbers (for 

231 a different base). 

232 

233 Args: 

234 n: 

235 Generate numbers in the range 0, ..., `n` - 1. 

236 (Inclusive.) Must be larger than 0. 

237 

238 Returns: 

239 A pseudorandom number in the range 0, ..., `n` - 1. 

240 

241 Raises: 

242 ValueError: 

243 The range is empty. 

244 SequinExhaustedError: 

245 The sequin is exhausted. 

246 

247 Info: Thread-safety 

248 This call is thread-safe. 

249 

250 Examples: 

251 >>> seq = Sequin( 

252 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1], 

253 ... is_bitstring=True, 

254 ... ) 

255 >>> seq2 = Sequin( 

256 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1], 

257 ... is_bitstring=True, 

258 ... ) 

259 >>> seq.generate(5) 

260 3 

261 >>> seq.generate(5) 

262 3 

263 >>> seq.generate(5) 

264 1 

265 >>> seq.generate(5) # doctest: +IGNORE_EXCEPTION_DETAIL 

266 Traceback (most recent call last): 

267 ... 

268 SequinExhaustedError: Sequin is exhausted 

269 

270 Using `n = 1` does not actually consume input bits: 

271 

272 >>> seq2.generate(1) 

273 0 

274 

275 But it still won't work on exhausted sequins: 

276 

277 >>> seq.generate(1) # doctest: +IGNORE_EXCEPTION_DETAIL 

278 Traceback (most recent call last): 

279 ... 

280 SequinExhaustedError: Sequin is exhausted 

281 

282 """ 

283 with self.consistency_lock: 1acpwxqyrsdzABCPDEnFGHIoJKLtMuevfghijklbm

284 if 2 not in self.bases: # noqa: PLR2004 1acpwxqyrsdzABCPDEnFGHIoJKLtMuevfghijklbm

285 raise SequinExhaustedError 1aDf

286 value = self._generate_inner(n, base=2) 1acpwxqyrsdzABCPDEnFGHIoJKLtMuevfghijklbm

287 if value == n: 1acpwxqyrsdzABCDEnFGHIoJKLtMuevfghijklbm

288 raise SequinExhaustedError 1f

289 return value 1acpwxqyrsdzABCDEnFGHIoJKLtMuevfghijklbm

290 

291 def _generate_inner(self, n: int, /, *, base: int = 2) -> int: 

292 """Recursive call to generate a base `n` non-negative integer. 

293 

294 We first determine the correct exponent `k` to generate base `n` 

295 numbers from a stream of base `base` numbers, then attempt to 

296 take `k` numbers from the base `base` sequence (or bail if not 

297 possible). If the resulting number `v` is out of range for 

298 base `n`, then push `v - n` onto the rejection queue for 

299 base `r` (where `r = base ** k - n`), and attempt to generate 

300 the requested base `n` integer from the sequence of base `r` 

301 numbers next. (This recursion is not attempted if `r` = 1.) 

302 Otherwise, return the number. 

303 

304 Args: 

305 n: 

306 Generate numbers in the range 0, ..., `n - 1`. 

307 (Inclusive.) Must be larger than 0. 

308 base: 

309 Use the base `base` sequence as a source for 

310 pseudorandom numbers. 

311 

312 Returns: 

313 A pseudorandom number in the range 0, ..., `n - 1` if 

314 possible, or `n` if the stream is exhausted. 

315 

316 Raises: 

317 ValueError: 

318 The range is empty. 

319 

320 Warning: Thread-safety 

321 This call is **not thread-safe**. 

322 

323 Examples: 

324 >>> seq = Sequin( 

325 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1], 

326 ... is_bitstring=True, 

327 ... ) 

328 >>> seq2 = Sequin( 

329 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1], 

330 ... is_bitstring=True, 

331 ... ) 

332 >>> seq._generate_inner(5) 

333 3 

334 >>> seq._generate_inner(5) 

335 3 

336 >>> seq._generate_inner(5) 

337 1 

338 >>> seq._generate_inner(5) # error condition: sequin exhausted 

339 5 

340 

341 Using `n = 1` does not actually consume input bits, and 

342 always works, regardless of sequin exhaustion: 

343 

344 >>> seq2._generate_inner(1) 

345 0 

346 >>> seq._generate_inner(1) 

347 0 

348 

349 Using an unsuitable range will raise: 

350 

351 >>> seq2._generate_inner(0) # doctest: +IGNORE_EXCEPTION_DETAIL 

352 Traceback (most recent call last): 

353 ... 

354 ValueError: invalid target range 

355 

356 """ 

357 if n < 1: 1acpwxqyrsdzABCPQDNEnFGHIoJKLtMuevfghijklbm

358 msg = 'invalid target range' 1aPQb

359 raise ValueError(msg) 1aPQb

360 if base < 2: # noqa: PLR2004 1acpwxqyrsdzABCQDNEnFGHIoJKLtMuevfghijklbm

361 msg = f'invalid base: {base!r}' 1Q

362 raise ValueError(msg) 1Q

363 # p = base ** k, where k is the smallest integer such that 

364 # p >= n. We determine p and k inductively. 

365 p = 1 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

366 k = 0 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

367 while p < n: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

368 p *= base 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

369 k += 1 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

370 # The remainder r of p and n is used as the base for rejection 

371 # queue. 

372 r = p - n 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

373 # The generated number v is initialized to n because of the 

374 # while loop below. 

375 v = n 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

376 while v > n - 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

377 list_slice = self._all_or_nothing_shift(k, base=base) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

378 if not list_slice: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

379 if n != 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

380 return n 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

381 v = 0 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

382 v = self._big_endian_number(list_slice, base=base) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

383 if v > n - 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

384 # If r is 0, then p == n, so v < n, or rather 

385 # v <= n - 1. 

386 assert r > 0 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

387 if r == 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

388 continue 1cpqrsdItuevghijklbm

389 self._stash(v - n, base=r) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

390 v = self._generate_inner(n, base=r) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

391 return v 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

392 

393 def _stash(self, value: int, /, *, base: int = 2) -> None: 

394 """Stash `value` on the base `base` sequence. 

395 

396 Sets up the base `base` sequence if it does not yet exist. 

397 

398 Info: Thread-safety 

399 This call is thread-safe. 

400 

401 """ 

402 with self.consistency_lock: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

403 if base not in self.bases: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

404 self.bases[base] = collections.deque() 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

405 self.bases[base].append(value) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm

406 

407 

408class SequinExhaustedError(Exception): 

409 """The sequin is exhausted. 

410 

411 No more values can be generated from this sequin. 

412 

413 """ 

414 

415 def __init__(self) -> None: # noqa: D107 

416 super().__init__('Sequin is exhausted') 1aDf