Coverage for src\derivepassphrase\sequin.py: 100.000%
109 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
5"""A Python reimplementation of James Coglan's "sequin" Node.js module.
7James Coglan's "sequin" Node.js module provides a pseudorandom number
8generator (using rejection sampling on a stream of input numbers) that
9attempts to minimize the amount of information it throws away:
10(non-degenerate) rejected samples are fed into a stream of higher-order
11numbers from which the next random number generation request will be
12served. The sequin module is used in Coglan's "vault" module (a
13deterministic, stateless password manager that recomputes passwords
14instead of storing them), and this reimplementation is used for
15a similar purpose.
17The main API is the [`Sequin`][] class, which is thoroughly documented.
19"""
21# ruff: noqa: RUF002,RUF003
23from __future__ import annotations
25import collections
26import threading
27from typing import TYPE_CHECKING
29from typing_extensions import assert_type
31if TYPE_CHECKING:
32 from collections.abc import Iterator, Sequence
34__all__ = ('Sequin', 'SequinExhaustedError')
37class Sequin:
38 """Generate pseudorandom non-negative numbers in different ranges.
40 Given a (presumed high-quality) uniformly random sequence of input
41 bits, generate pseudorandom non-negative integers in a certain range
42 on each call of the `generate` method. (It is permissible to
43 specify a different range per call to `generate`; this is the main
44 use case.) We use a modified version of rejection sampling, where
45 rejected values are stored in "rejection queues" if possible, and
46 these rejection queues re-seed the next round of rejection sampling.
48 This is a Python reimplementation of James Coglan's [Node.js sequin
49 module][JS_SEQUIN], as introduced in [his blog post][BLOG_POST]. It
50 uses a [technique by Christian Lawson-Perfect][SEQUIN_TECHNIQUE].
51 I do not know why the original module is called "sequin"; I presume
52 it to be a pun on "sequence".
54 [JS_SEQUIN]: https://www.npmjs.com/package/sequin
55 [BLOG_POST]: https://blog.jcoglan.com/2012/07/16/designing-vaults-generator-algorithm/
56 [SEQUIN_TECHNIQUE]: https://checkmyworking.com/2012/06/converting-a-stream-of-binary-digits-to-a-stream-of-base-n-digits/
58 """
60 def __init__(
61 self,
62 sequence: str | bytes | bytearray | Sequence[int],
63 /,
64 *,
65 is_bitstring: bool = False,
66 ) -> None:
67 """Initialize the Sequin.
69 Args:
70 sequence:
71 A sequence of bits, or things convertible to bits, to
72 seed the pseudorandom number generator. Byte and text
73 strings are converted to 8-bit integer sequences.
74 (Conversion will fail if the text string contains
75 non-ISO-8859-1 characters.) The numbers are then
76 converted to bits.
77 is_bitstring:
78 If true, treat the input as a bitstring. By default,
79 the input is treated as a string of 8-bit integers, from
80 which the individual bits must still be extracted.
82 Raises:
83 ValueError:
84 The sequence contains values outside the permissible
85 range.
87 """
88 msg = 'sequence item out of range' 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
90 def uint8_to_bits(value: int) -> Iterator[int]: 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
91 """Yield individual bits of an 8-bit number, MSB first."""
92 for i in (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01): 1acpwxqyrsdzABCREnFGHIoJKLtMuevfghijklbm
93 yield 1 if value | i == value else 0 1acpwxqyrsdzABCREnFGHIoJKLtMuevfghijklbm
95 if isinstance(sequence, str): 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
96 try: 1SR
97 sequence = tuple(sequence.encode('iso-8859-1')) 1SR
98 except UnicodeError as e: 1S
99 raise ValueError(msg) from e 1S
100 else:
101 sequence = tuple(sequence) 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
102 assert_type(sequence, tuple[int, ...]) 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
103 consistency_lock = threading.RLock() 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
104 bitstream: collections.deque[int] = collections.deque() 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
105 for num in sequence: 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
106 if num not in range(2 if is_bitstring else 256): 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
107 raise ValueError(msg) 1S
108 if is_bitstring: 1acpwxqyrsdzABCPQSRDNOEnFGHIoJKLtMuevfghijklbm
109 bitstream.append(num) 1aPQSRDNO
110 else:
111 bitstream.extend(uint8_to_bits(num)) 1acpwxqyrsdzABCREnFGHIoJKLtMuevfghijklbm
113 with consistency_lock: 1acpwxqyrsdzABCPQRDNOEnFGHIoJKLtMuevfghijklbm
114 self.consistency_lock = consistency_lock 1acpwxqyrsdzABCPQRDNOEnFGHIoJKLtMuevfghijklbm
115 self.bases = {2: bitstream} 1acpwxqyrsdzABCPQRDNOEnFGHIoJKLtMuevfghijklbm
117 def _all_or_nothing_shift(
118 self, count: int, /, *, base: int = 2
119 ) -> Sequence[int]:
120 """Shift and return items if and only if there are enough.
122 Args:
123 count: Number of items to shift/consume.
124 base: Use the base `base` sequence.
126 Returns:
127 If there are sufficient items in the sequence left, then
128 consume them from the sequence and return them. Otherwise,
129 consume nothing, and return nothing.
131 Info: Thread-safety
132 This call is thread-safe.
134 Notes:
135 We currently remove now-empty sequences from the registry of
136 sequences.
138 Examples:
139 >>> seq = Sequin([1, 0, 1, 0, 0, 1, 0, 0, 0, 1], is_bitstring=True)
140 >>> seq.bases
141 {2: deque([1, 0, 1, 0, 0, 1, 0, 0, 0, 1])}
142 >>> seq._all_or_nothing_shift(3)
143 (1, 0, 1)
144 >>> seq._all_or_nothing_shift(3)
145 (0, 0, 1)
146 >>> seq.bases[2]
147 deque([0, 0, 0, 1])
148 >>> seq._all_or_nothing_shift(5)
149 ()
150 >>> seq.bases[2]
151 deque([0, 0, 0, 1])
152 >>> seq._all_or_nothing_shift(4)
153 (0, 0, 0, 1)
154 >>> 2 in seq.bases # now-empty sequences are removed
155 False
157 """
158 with self.consistency_lock: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
159 try: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
160 seq = self.bases[base] 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
161 except KeyError: 1acdNOnoeghijklbm
162 return () 1acdNOnoeghijklbm
163 stash: collections.deque[int] = collections.deque() 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
164 try: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
165 for _ in range(count): 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
166 stash.append(seq.popleft()) 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
167 except IndexError: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
168 seq.extendleft(reversed(stash)) 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
169 return () 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
170 # Clean up queues.
171 if not seq: 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
172 del self.bases[base] 1acpwxqyrsdzABCDNOEnFGHoJKLtuevfghijklbm
173 return tuple(stash) 1acpwxqyrsdzABCDNOEnFGHIoJKLtMuevfghijklbm
175 @staticmethod
176 def _big_endian_number(digits: Sequence[int], /, *, base: int = 2) -> int:
177 """Evaluate the given integer sequence as a big endian number.
179 Args:
180 digits: A sequence of integers to evaluate.
181 base: The number base to evaluate those integers in.
183 Returns:
184 The number value of the integer sequence.
186 Raises:
187 ValueError: `base` is an invalid base.
188 ValueError: Not all integers are valid base `base` digits.
190 Info: Thread-safety
191 This call is thread-safe.
193 Examples:
194 >>> Sequin._big_endian_number([1, 2, 3, 4, 5, 6, 7, 8], base=10)
195 12345678
196 >>> Sequin._big_endian_number([1, 2, 3, 4, 5, 6, 7, 8], base=100)
197 102030405060708
198 >>> Sequin._big_endian_number([0, 0, 0, 0, 1, 4, 9, 7], base=10)
199 1497
200 >>> Sequin._big_endian_number([1, 0, 0, 1, 0, 0, 0, 0], base=2)
201 144
202 >>> Sequin._big_endian_number([1, 7, 5, 5], base=8) == 0o1755
203 True
205 """ # noqa: DOC501
206 if base < 2: # noqa: PLR2004 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
207 msg = f'invalid base: {base!r}' 1T
208 raise ValueError(msg) 1T
209 ret = 0 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
210 allowed_range = range(base) 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
211 n = len(digits) 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
212 for i in range(n): 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
213 i2 = (n - 1) - i 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
214 x = digits[i] 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
215 if not isinstance(x, int): 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
216 msg = f'not an integer: {x!r}' 1T
217 raise TypeError(msg) 1T
218 if x not in allowed_range: 1acpwxqyrsdzABCTUDNEnFGHIoJKLtMuevfghijklbm
219 msg = f'invalid base {base!r} digit: {x!r}' 1T
220 raise ValueError(msg) 1T
221 ret += (base**i2) * x 1acpwxqyrsdzABCUDNEnFGHIoJKLtMuevfghijklbm
222 return ret 1acpwxqyrsdzABCUDNEnFGHIoJKLtMuevfghijklbm
224 def generate(self, n: int, /) -> int:
225 """Generate a base `n` non-negative integer.
227 We attempt to generate a value using rejection sampling. If the
228 generated sample is outside the desired range (i.e., is
229 rejected), then attempt to reuse the sample by seeding
230 a "higher-order" input sequence of uniformly random numbers (for
231 a different base).
233 Args:
234 n:
235 Generate numbers in the range 0, ..., `n` - 1.
236 (Inclusive.) Must be larger than 0.
238 Returns:
239 A pseudorandom number in the range 0, ..., `n` - 1.
241 Raises:
242 ValueError:
243 The range is empty.
244 SequinExhaustedError:
245 The sequin is exhausted.
247 Info: Thread-safety
248 This call is thread-safe.
250 Examples:
251 >>> seq = Sequin(
252 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1],
253 ... is_bitstring=True,
254 ... )
255 >>> seq2 = Sequin(
256 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1],
257 ... is_bitstring=True,
258 ... )
259 >>> seq.generate(5)
260 3
261 >>> seq.generate(5)
262 3
263 >>> seq.generate(5)
264 1
265 >>> seq.generate(5) # doctest: +IGNORE_EXCEPTION_DETAIL
266 Traceback (most recent call last):
267 ...
268 SequinExhaustedError: Sequin is exhausted
270 Using `n = 1` does not actually consume input bits:
272 >>> seq2.generate(1)
273 0
275 But it still won't work on exhausted sequins:
277 >>> seq.generate(1) # doctest: +IGNORE_EXCEPTION_DETAIL
278 Traceback (most recent call last):
279 ...
280 SequinExhaustedError: Sequin is exhausted
282 """
283 with self.consistency_lock: 1acpwxqyrsdzABCPDEnFGHIoJKLtMuevfghijklbm
284 if 2 not in self.bases: # noqa: PLR2004 1acpwxqyrsdzABCPDEnFGHIoJKLtMuevfghijklbm
285 raise SequinExhaustedError 1aDf
286 value = self._generate_inner(n, base=2) 1acpwxqyrsdzABCPDEnFGHIoJKLtMuevfghijklbm
287 if value == n: 1acpwxqyrsdzABCDEnFGHIoJKLtMuevfghijklbm
288 raise SequinExhaustedError 1f
289 return value 1acpwxqyrsdzABCDEnFGHIoJKLtMuevfghijklbm
291 def _generate_inner(self, n: int, /, *, base: int = 2) -> int:
292 """Recursive call to generate a base `n` non-negative integer.
294 We first determine the correct exponent `k` to generate base `n`
295 numbers from a stream of base `base` numbers, then attempt to
296 take `k` numbers from the base `base` sequence (or bail if not
297 possible). If the resulting number `v` is out of range for
298 base `n`, then push `v - n` onto the rejection queue for
299 base `r` (where `r = base ** k - n`), and attempt to generate
300 the requested base `n` integer from the sequence of base `r`
301 numbers next. (This recursion is not attempted if `r` = 1.)
302 Otherwise, return the number.
304 Args:
305 n:
306 Generate numbers in the range 0, ..., `n - 1`.
307 (Inclusive.) Must be larger than 0.
308 base:
309 Use the base `base` sequence as a source for
310 pseudorandom numbers.
312 Returns:
313 A pseudorandom number in the range 0, ..., `n - 1` if
314 possible, or `n` if the stream is exhausted.
316 Raises:
317 ValueError:
318 The range is empty.
320 Warning: Thread-safety
321 This call is **not thread-safe**.
323 Examples:
324 >>> seq = Sequin(
325 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1],
326 ... is_bitstring=True,
327 ... )
328 >>> seq2 = Sequin(
329 ... [1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1],
330 ... is_bitstring=True,
331 ... )
332 >>> seq._generate_inner(5)
333 3
334 >>> seq._generate_inner(5)
335 3
336 >>> seq._generate_inner(5)
337 1
338 >>> seq._generate_inner(5) # error condition: sequin exhausted
339 5
341 Using `n = 1` does not actually consume input bits, and
342 always works, regardless of sequin exhaustion:
344 >>> seq2._generate_inner(1)
345 0
346 >>> seq._generate_inner(1)
347 0
349 Using an unsuitable range will raise:
351 >>> seq2._generate_inner(0) # doctest: +IGNORE_EXCEPTION_DETAIL
352 Traceback (most recent call last):
353 ...
354 ValueError: invalid target range
356 """
357 if n < 1: 1acpwxqyrsdzABCPQDNEnFGHIoJKLtMuevfghijklbm
358 msg = 'invalid target range' 1aPQb
359 raise ValueError(msg) 1aPQb
360 if base < 2: # noqa: PLR2004 1acpwxqyrsdzABCQDNEnFGHIoJKLtMuevfghijklbm
361 msg = f'invalid base: {base!r}' 1Q
362 raise ValueError(msg) 1Q
363 # p = base ** k, where k is the smallest integer such that
364 # p >= n. We determine p and k inductively.
365 p = 1 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
366 k = 0 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
367 while p < n: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
368 p *= base 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
369 k += 1 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
370 # The remainder r of p and n is used as the base for rejection
371 # queue.
372 r = p - n 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
373 # The generated number v is initialized to n because of the
374 # while loop below.
375 v = n 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
376 while v > n - 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
377 list_slice = self._all_or_nothing_shift(k, base=base) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
378 if not list_slice: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
379 if n != 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
380 return n 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
381 v = 0 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
382 v = self._big_endian_number(list_slice, base=base) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
383 if v > n - 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
384 # If r is 0, then p == n, so v < n, or rather
385 # v <= n - 1.
386 assert r > 0 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
387 if r == 1: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
388 continue 1cpqrsdItuevghijklbm
389 self._stash(v - n, base=r) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
390 v = self._generate_inner(n, base=r) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
391 return v 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
393 def _stash(self, value: int, /, *, base: int = 2) -> None:
394 """Stash `value` on the base `base` sequence.
396 Sets up the base `base` sequence if it does not yet exist.
398 Info: Thread-safety
399 This call is thread-safe.
401 """
402 with self.consistency_lock: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
403 if base not in self.bases: 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
404 self.bases[base] = collections.deque() 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
405 self.bases[base].append(value) 1acpwxqyrsdzABCDNEnFGHIoJKLtMuevfghijklbm
408class SequinExhaustedError(Exception):
409 """The sequin is exhausted.
411 No more values can be generated from this sequin.
413 """
415 def __init__(self) -> None: # noqa: D107
416 super().__init__('Sequin is exhausted') 1aDf