GitLab Repo

amachine.am_hmm

   1from __future__ import annotations
   2
   3from abc import abstractmethod
   4from typing import override
   5import gc
   6import copy
   7from collections import deque
   8from collections import defaultdict
   9import json
  10from fractions import Fraction
  11from pathlib import Path
  12import warnings
  13import time
  14from dataclasses import dataclass, asdict, field
  15
  16import matplotlib.pyplot as plt
  17
  18import networkx as nx
  19
  20from automata.fa.dfa import DFA
  21
  22import sympy
  23import numpy as np
  24
  25from .am_machine import Machine
  26from .am_symbol import Symbol
  27
  28from   .am_msp import MSP, compute_msp, compute_msp_exact
  29from   .am_solve import solve_for_pi, solve_for_pi_fractional
  30from . import am_vis
  31from . import am_fast
  32
  33from .am_causal_state import CausalState
  34from .am_transition   import Transition
  35
  36from .am_fast.distance import jensenshannondivergence_gpu as af_jensenshannondivergence
  37from .am_fast.distance import jensenshannondivergence_cpu as af_jensenshannondivergence_cpu
  38
  39class HMM(Machine) :
  40
  41	"""Hidden Markov model implementing epsilon machines, mixed state presentations,
  42	complexity measures, and data generation.
  43
  44	Args:
  45		states (list[CausalState] | None): A list of causal states.
  46		transitions (list[Transition] | None): A list of transitions between states.
  47		start_state (int): Index of the start state.
  48		alphabet (list[str]): List of symbols making up the alphabet.
  49		name (str): Name of the model.
  50		description (str): Description of the model.
  51	"""
  52
  53	def __init__( 
  54		self,
  55		states : list[CausalState] | None = None,
  56		transitions : list[Transition] | None = None,
  57		start_state : int = 0,
  58		alphabet : list[str] | None = None,
  59		name : str = "",
  60		description : str = "" ) : 
  61
  62		self.alphabet    : list[str] | None = alphabet or []
  63		self.states      : list[CausalState] | None= states or []
  64		self.transitions : list[Transition] | None= transitions or []
  65
  66		self.set_alphabet( self.alphabet )
  67		self.set_states( self.states )
  68		self.set_transitions( self.transitions )
  69
  70		self.start_state : int = start_state
  71
  72		self.name : str = name
  73		self.description : str = description
  74
  75		# To be depreciated
  76		self.isoclass : str = None
  77
  78		# --- derived --------
  79
  80		self.complexity : dict[str, any] = {}
  81
  82		self.symbol_idx_map : dict[str,int] = {}
  83		self.state_idx_map  : dict[str,int] = {}
  84
  85		self.pi_fractional = None
  86		self.pi : np.ndarray | None = None
  87		self.T  : np.ndarray | None = None
  88		self.msp : MSP | None = None
  89		self.reverse_am : HMM | None = None
  90		self.is_q_weighted : bool = False
  91		self.is_minimal : bool = False
  92
  93		# --- const ----------
  94
  95		self.EPS : float = 1e-12
  96
  97	#-------------------------------------------------------#
  98	#                Overrides / Getters                    #
  99	#-------------------------------------------------------#
 100
 101	@override
 102	def get_states(self) -> list[CausalState] : 
 103		return self.states
 104
 105	@override
 106	def get_transitions(self) -> list[Transition] : 
 107		return self.transitions
 108
 109	@override
 110	def get_alphabet(self) -> list[Symbol]:
 111		return [ Symbol(a) for a in self.alphabet ]
 112
 113	#-------------------------------------------------------#
 114	#                     Serialization                     #
 115	#-------------------------------------------------------#
 116
 117
 118	def to_dict(self) -> dict[str,any]:
 119
 120		"""Create a dict representing the HMM configuration.
 121
 122		Returns:
 123
 124			dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass.
 125		"""
 126
 127		return {
 128			"name"            : self.name,
 129			"description"     : self.description,
 130			"states"          : [ asdict(state)      for state      in self.states      ],
 131			"transitions"     : [ asdict(transition) for transition in self.transitions ],
 132			"alphabet"        : self.alphabet,
 133			"isoclass"        : self.isoclass
 134		}
 135
 136	def from_dict( self, config : dict[str,any] ) :
 137
 138		"""Configure the HMM configuration from a dictionary.
 139
 140		Args:
 141			config (dict[str,any]): The HMM configuration.
 142		"""
 143
 144		self.name            = config.name 
 145		self.description     = config.description 
 146		
 147		self.set_states( states=[ 
 148			CausalState( 
 149				name=state[ "name" ],
 150				classes=set( state[ "classes" ] )
 151			)
 152			for state in config.states
 153		] )
 154
 155		self.set_transitions( transitions=[ 
 156			Transition( 
 157				origin_state_idx=tr[ "origin_state_idx" ],
 158				target_state_idx=tr[ "target_state_idx" ],
 159				prob=tr[ "prob" ],
 160				symbol_idx=tr[ "symbol_idx" ]
 161			)
 162			for tr in config.transitions
 163		] )
 164
 165		self.set_alphabet( alphabet=config.alphabet ) 
 166
 167	def save_config(
 168		self, 
 169		path : Path, 
 170		with_complexity : bool = False, 
 171		with_non_trivial_complexity : bool = False ) :
 172
 173		config = self.to_dict()
 174
 175		if with_complexity :
 176			
 177			complexity = self.get_complexities( 
 178				with_non_trivial=with_non_trivial_complexity
 179			)
 180
 181			config[ "complexity" ] = complexity
 182
 183		config[ "structural_properties" ] = {
 184			"unifilar"              : self.is_unifilar(),
 185			"row_stochastic"        : self.is_row_stochastic(),
 186			"strongly_connected"    : self.is_strongly_connected(),
 187			"aperiodic"             : self.is_aperiodic(),
 188			"minimal"               : self._is_minimal_as_dfa( topological_only=False ),
 189			"topologically_minimal" : self._is_minimal_as_dfa( topological_only=True ),
 190			"is_epsilon_machine"    : self.is_epsilon_machine()
 191		}
 192
 193		with open( path / "am_config.json", "w", encoding="utf-8" ) as f :
 194			json.dump( config, f, ensure_ascii=False, indent=2, default=list )
 195
 196	def from_file( self, path : Path ) :
 197		with open( Path / "am_config.json", "r" ) as f:
 198			config = json.load(f)
 199		self.from_dict()
 200
 201	#-------------------------------------------------------#
 202	#             Setters and State Management              #
 203	#-------------------------------------------------------#
 204	
 205	def clear_cache(self) :
 206
 207		"""
 208		Reset all derived properties so they will be recomputed when requested later.
 209		"""
 210
 211		self.complexity = {}
 212		self.T = None
 213		self.T_x = None
 214		self.pi = None
 215		self.pi_fractional = None
 216		self.msp = None 
 217		self.reverse_am = None
 218		self.is_q_weighted = False
 219		self.is_minimal = False
 220
 221	def set_states( self, states : list[CausalState] ) :
 222		self.clear_cache()
 223		self.states = states.copy()
 224		self.state_idx_map = {}
 225		for idx, state in enumerate( self.states ) :
 226			self.state_idx_map[ state.name ] = idx
 227
 228	def set_alphabet( self, alphabet : list[str] ) :
 229
 230		self.clear_cache()
 231
 232		old_alphabet = self.alphabet.copy()
 233
 234		aSet = set()
 235		aSet.update( alphabet )
 236
 237		self.alphabet = sorted(list(aSet))
 238
 239		self.symbol_idx_map = {}
 240		for idx, symbol in enumerate( self.alphabet ) :
 241			self.symbol_idx_map[ symbol ] = idx
 242
 243		for i, tr in enumerate( self.transitions ) :
 244			symbol = old_alphabet[ tr.symbol_idx ]
 245			self.transitions[ i ] = Transition(
 246				origin_state_idx=tr.origin_state_idx,
 247				target_state_idx=tr.target_state_idx,
 248				prob=tr.prob,
 249				symbol_idx=self.symbol_idx_map[ symbol ]
 250			)
 251
 252	def set_transitions( self, transitions : list[Transition] ) :
 253		self.clear_cache()
 254		self.transitions = transitions.copy()
 255
 256	def extend_states( self, states : list[CausalState] ) :
 257		self.set_states( self.states + states )
 258
 259	def extend_alphabet( self, alphabet : list[str] ) :
 260		self.set_alphabet( self.alphabet + alphabet  )
 261
 262	def extend_transitions( self, transitions : list[Transition] ) :
 263		self.set_transitions( self.transitions + transitions  )
 264
 265	def get_complexity_measure_if_exists(self, measure ) :
 266		m = self.complexity.get( measure, None )
 267		return m
 268
 269	def set_complexity_measure(self, measure, value ) :
 270		self.complexity[ measure ] = value
 271
 272	#-------------------------------------------------------#
 273	#                   Get and Compute                     #
 274	#-------------------------------------------------------#
 275
 276	def get_complexities( 
 277		self, 
 278		with_non_trivial=False ) :
 279
 280		trivial = [
 281			self.C_mu,
 282			self.h_mu,
 283			self.H_1,
 284			self.rho_mu
 285		]
 286
 287		non_trivial = [
 288			self.E, 
 289			self.T_inf,
 290			self.S,
 291			self.chi
 292		]
 293			
 294		complexities = { m.__name__ : m() for m in trivial }
 295
 296		if with_non_trivial :
 297
 298			complexities |= { m.__name__ : m() for m in non_trivial }
 299
 300			for key in [ 'H_L', 'h_mu_L', 'H_sync' ] :
 301				if key in self.complexity :
 302					complexities[ key ] = self.complexity[ key ]
 303
 304		return complexities
 305
 306	#-------------------------------------------------------#
 307
 308	def get_metadata(self) :
 309		return {
 310			"name" : self.name,
 311			'complexity' : self.complexity,
 312			"description" : self.description
 313		}
 314
 315	def get_transition_matrix(self) :
 316
 317		if self.T  is not None :
 318			return self.T
 319
 320		n_states = len( self.states )
 321		T = np.zeros((n_states, n_states))
 322
 323		for tr in self.transitions :    
 324			T[ tr.origin_state_idx, tr.target_state_idx  ] = tr.prob
 325
 326		self.T = T
 327
 328		return self.T
 329
 330	#-------------------------------------------------------#
 331
 332	def get_T_X(self) :
 333
 334		if self.T_x  is not None :
 335			return self.T_x
 336
 337		n_states  = len( self.states )
 338		n_symbols = len( self.alphabet )
 339
 340		T_x = [ np.zeros((n_states, n_states)) for _ in range( n_symbols ) ]
 341
 342		for tr in self.transitions :
 343			T_x[ tr.symbol_idx ][tr.origin_state_idx, tr.target_state_idx] = tr.prob
 344
 345		self.T_x = T_x
 346		return self.T_x
 347
 348	#-------------------------------------------------------#
 349
 350	def get_msp_qw(
 351		self,
 352		exact_state_cap: int = 1000,
 353		verbose: bool = True,
 354	):
 355		if self.msp is not None:
 356			return self.msp
 357
 358		try : 
 359
 360			print( "\nTrying to Compute Mixed State Presentation using Exact Fractions\n" )
 361
 362			self.msp = compute_msp_exact(
 363				T_x=self.get_Tx_fractional(),
 364				pi=self.get_fractional_stationary_distribution(),
 365				n_states=len(self.states),
 366				alphabet=self.alphabet,
 367				exact_state_cap=1000,
 368				bool = True
 369			)
 370
 371			return self.msp 
 372
 373		except RuntimeError as e :
 374			warnings.warn( f"Exact msp failed: {e} Falling back to msp approximation." )
 375
 376		return self.get_msp()
 377
 378	def get_msp(
 379		self,
 380		exact_state_cap: int = 175_000,
 381		jsd_eps:         float = 1e-7,
 382		k_ann:           int   = 50,
 383		verbose                = True,
 384	) :
 385
 386		if self.msp is not None:
 387			return self.msp
 388	 
 389		T_x = self.get_T_X()
 390		pi  = self.get_stationary_distribution()
 391
 392		T_stacked      = np.stack(T_x)
 393		n_symbols      = len(self.alphabet)
 394		n_input_states = T_stacked.shape[1]
 395	
 396		print( "\nComputing Mixed State Presentation..." )
 397
 398		self.msp = compute_msp( 
 399			T_x=T_x,
 400			pi=pi,
 401			n_states=len(self.states),
 402			alphabet=self.alphabet,
 403			exact_state_cap=exact_state_cap,
 404			verbose=verbose
 405		)
 406
 407		return self.msp
 408
 409	def get_reverse_am(self) :
 410
 411		if self.reverse_am is not None:
 412			return self.reverse_am
 413
 414		pi = self.get_stationary_distribution()
 415		self.reverse_am = copy.deepcopy(self)
 416		
 417		new_transitions = []
 418		for tr in self.transitions:
 419			i = tr.target_state_idx
 420			j = tr.origin_state_idx
 421			
 422			p_reversed = (pi[j] * tr.prob) / pi[i]
 423			
 424			new_transitions.append(
 425				Transition(
 426					origin_state_idx=i,
 427					target_state_idx=j,
 428					prob=p_reversed,
 429					symbol_idx=tr.symbol_idx
 430				)
 431			)
 432
 433		self.reverse_am.set_transitions(new_transitions)
 434
 435		if self.reverse_am.is_epsilon_machine():
 436			return self.reverse_am
 437
 438		rmsp = self.reverse_am.get_msp_qw( exact_state_cap=len(self.states)*4 )
 439
 440		self.reverse_am.set_states( rmsp.states )
 441		self.reverse_am.set_transitions( rmsp.transitions )
 442		self.reverse_am.msp = rmsp
 443		self.reverse_am.start_state = 0
 444
 445		self.reverse_am.collapse_to_largest_strongly_connected_subgraph()
 446		self.reverse_am.minimize()
 447
 448		return self.reverse_am
 449
 450	#-------------------------------------------------------#
 451
 452	def get_Tx_fractional(self) -> list[ list[ list[ Fraction ] ] ] :
 453
 454		self.to_q_weighted()
 455
 456		n_states  = len( self.states )
 457		n_symbols = len( self.alphabet )
 458
 459		T_x = []
 460
 461		for x in range( n_symbols ) :
 462			T_x.append( [] )
 463			for i in range( n_states ) :
 464				T_x[ x ].append( [ 0 for _ in range( n_states ) ] )
 465
 466		for tr in self.transitions :
 467			T_x[ tr.symbol_idx ][ tr.origin_state_idx ][ tr.target_state_idx ] = tr.pq
 468
 469		return T_x
 470
 471	def get_T_sympy( self ) :
 472
 473		self.to_q_weighted()
 474
 475		n = len( self.states )
 476		T = sympy.zeros( n, n )
 477
 478		for tr in self.transitions :
 479			T[ tr.origin_state_idx, tr.target_state_idx ] = tr.pq
 480
 481		return T
 482
 483	def get_fractional_stationary_distribution(self) :
 484
 485		T = self.get_T_sympy()
 486
 487		if self.pi_fractional is not None :
 488			return self.pi_fractional
 489
 490		G = self.as_digraph()
 491
 492		if not nx.is_strongly_connected(G):
 493			raise ValueError( "Single stationary distribution requires strongly connected HMM." )
 494
 495		self.pi_fractional = solve_for_pi_fractional( T )
 496
 497		return self.pi_fractional
 498
 499	def get_stationary_distribution(self):
 500
 501		if self.pi is not None :
 502			return self.pi
 503
 504		G = self.as_digraph()
 505		
 506		if not nx.is_strongly_connected(G):
 507			raise ValueError( "Single stationary distribution requires strongly connected HMM." )
 508
 509		T = self.get_transition_matrix()
 510		return solve_for_pi( T )		
 511
 512	#-------------------------------------------------------#
 513	#                 Complexity Measures                   #
 514	#-------------------------------------------------------#
 515	
 516	def C_mu( self ) :
 517
 518		"""The *statistical complexity* (aka *forecasting complexity*) :
 519
 520		.. math::
 521
 522			C_{\\mu} = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\log_2 \\Pr(\\sigma),
 523
 524		where :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2.
 525
 526		.. note::
 527
 528			**Interpretations**
 529
 530			* The amount of historical information a process stores.
 531			* The amount of structure in a process.
 532
 533		Returns:
 534
 535			float: :math:`C_{\\mu}`.
 536
 537		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 538			Decomposition of Intrinsic Computation*, 2016.
 539			<https://arxiv.org/abs/1309.3792>
 540		"""
 541
 542		m = self.get_complexity_measure_if_exists( "C_mu" )
 543
 544		if m is not None :
 545			return m
 546
 547		pi = self.get_stationary_distribution()
 548
 549		h = 0
 550		for i, pr in enumerate( pi ) :
 551			
 552			if pr < self.EPS :
 553				continue
 554
 555			h += -pr * np.log2( pr )
 556
 557		self.set_complexity_measure( "C_mu", h )
 558
 559		return h
 560
 561	#-------------------------------------------------------#
 562
 563	def h_mu( self ) :
 564
 565		"""The *entropy rate* :
 566
 567		.. math::
 568
 569			h_{\\mu}(\\boldsymbol{\\mathcal{S}}) = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\sum_{x \\in \\mathcal{A}} \\Pr(x|\\sigma) \\log_2 \\Pr(x|\\sigma),
 570
 571		where :math:`\\mathcal{A}` is the alphabet and :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2.
 572
 573		.. note::
 574
 575			**Interpretations**
 576
 577			* The lower bound on achievable loss in bits. 
 578			* The irreducable randomness in the process.
 579			* The intrinsic Randomness in the process.
 580
 581		Returns:
 582			
 583			float: :math:`h_{\\mu}`.
 584
 585		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 586			Decomposition of Intrinsic Computation*, 2016.
 587			<https://arxiv.org/abs/1309.3792>
 588		"""
 589
 590		m = self.get_complexity_measure_if_exists( "h_mu" )
 591
 592		if m is not None :
 593			return m
 594
 595		T  = self.get_transition_matrix()
 596		pi = self.get_stationary_distribution()
 597
 598		n_states = pi.size
 599
 600		h = 0
 601		for i, pr in enumerate( pi ) :
 602
 603			if pr < self.EPS :
 604				continue
 605
 606			row_entropy = 0
 607			for j in range( len( pi ) ) :
 608
 609				if T[ i, j ]  < self.EPS :
 610					continue
 611
 612				row_entropy -= T[ i, j ] * np.log2( T[ i, j ] )
 613
 614			h += pr * row_entropy
 615
 616		self.set_complexity_measure( "h_mu", h )
 617
 618		return h
 619
 620	#-------------------------------------------------------#
 621
 622	def H_1(self) -> float :
 623
 624		"""The *single symbol uncertainty*:
 625
 626		.. math::
 627
 628			H(1)=-\\sum_{x\\in\\mathcal{A}} \\Pr(x) \\log_2{\\Pr(x)},
 629
 630		where :math:`\\mathcal{A}` is the alphabet [^James_2018], p.2.
 631
 632		.. note::
 633
 634			**Interpretations**
 635
 636			* How uncertain you are on average about a single measurement with no context.
 637
 638		Returns:
 639
 640			float: :math:`H(1)`.
 641
 642		[^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018.
 643			<https://arxiv.org/abs/1105.2988>
 644		"""
 645
 646		m = self.get_complexity_measure_if_exists("H_1")
 647		if m is not None:
 648			return m
 649
 650		pi  = self.get_stationary_distribution()
 651		T_X = self.get_T_X()  # dict: symbol -> matrix
 652
 653		h = 0.0
 654		for T_x in T_X:
 655			# Pr(x) = sum_i pi[i] * sum_j T^(x)[i,j]
 656			p_sym = 0.0
 657			for i, pr in enumerate(pi):
 658				if pr < self.EPS:
 659					continue
 660				p_sym += pr * T_x[i, :].sum()
 661
 662			if p_sym < self.EPS:
 663				continue
 664			h -= p_sym * np.log2(p_sym)
 665
 666		self.set_complexity_measure("H_1", h)
 667		return h
 668
 669	#-------------------------------------------------------#
 670
 671	def rho_mu(self) -> float :
 672		
 673		"""The *anticipated information* [^James_2018], p.3.:
 674
 675		.. math::
 676
 677			\\rho_{\\mu}= H(1) - h_{\\mu}
 678
 679		Returns:
 680			
 681			float: :math:`\\rho_{\\mu}`
 682
 683		[^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018.
 684			<https://arxiv.org/abs/1105.2988>
 685		"""
 686
 687		m = self.get_complexity_measure_if_exists("rho_mu")
 688		
 689		if m is not None:
 690			return m
 691
 692		rho = self.H_1() - self.h_mu()
 693		
 694		self.set_complexity_measure("rho_mu", rho)
 695		
 696		return rho
 697
 698	#-------------------------------------------------------#
 699
 700	def block_convergence( self )  :
 701
 702		"""
 703		Run [block entropy convergence](am_fast.html#block_entropy_convergence). Estimates [$\\mathbf{E}$](am_hmm.html#HMM.E), [$\\mathbf{S}$](am_hmm.html#HMM.S), [$\\mathbf{T}$](am_hmm.html#HMM.T_inf), and block measures[^crutchfield_exact_2016]:
 704
 705		- $\\mathbf{E}(L) = H(L) - L \\cdot h_{\\mu}$, 
 706
 707		- $\\mathbf{T}(L) = \\sum_{l=1}^{L} l \\left[ h_{\\mu}(l) - h_{\\mu} \\right]$, 
 708
 709		- $\\mathbf{S}(L) = \\sum_{l=0}^{L} \\mathcal{H}(l)$, 
 710
 711		- $H(L) = H[X_{0:L}]$, 
 712
 713		- $h_{\\mu}(L) = H(L) - H(L-1)$, and 
 714
 715		- $\\mathcal{H}(L) = -\\sum_{w \\in \\mathcal{A}^L} Pr(w) \\sum_{\\sigma \\in \\mathcal{S}} Pr(\\sigma|w) \\log_2 Pr(\\sigma|w)$.
 716
 717		You can plot these curves, and the block entropy curves using, [`amachine.HMM.draw_block_measure_curves`](am_hmm.html#HMM.draw_block_measure_curves), and [`amachine.HMM.draw_block_entropy_curve`](am_hmm.html#HMM.draw_block_entropy_curve). 
 718
 719		<img src="../resources/curves.png" alt="block measures plots" style="width: 100%; margin-left: 0%;">
 720
 721		Returns:
 722		
 723			ComplexityMeasures: An object containing the estimated measures with the following attributes:
 724			
 725			- E (float): The excess entropy.
 726			- T_inf (float): The transient information ($\\mathbf{T}$).
 727			- S (float): The synchronization information.
 728			- E_L (numpy.ndarray): The block excess entropy ($\\mathbf{E}(L)$).
 729			- T_L (numpy.ndarray): The block transient information ($\\mathbf{T}(L)$).
 730			- S_L (numpy.ndarray): The block synchronization information ($\\mathbf{S}(L)$).
 731			- H_L (numpy.ndarray): The block entropy ($H(L)$).
 732			- h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\\mu}(L)$).
 733			- H_sync (numpy.ndarray): The state-block synchronization ($\\mathcal{H}(L)$).
 734			- converged (bool): True if the algorithm converged.
 735
 736		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 737			Decomposition of Intrinsic Computation*, 2016.
 738			<https://arxiv.org/abs/1309.3792>
 739		"""
 740
 741		trs = [ [] for _ in range( len( self.states ) ) ]
 742		for tr in self.transitions :
 743			trs[ tr.origin_state_idx ].append( ( 
 744				tr.symbol_idx, 
 745				float( tr.prob ),
 746				tr.target_state_idx ) )
 747
 748		pi = self.get_stationary_distribution()
 749
 750		state_dist = [ float( pi[ i ] ) for i in range( len( self.states ) ) ]
 751		branches = [(1.0, list(state_dist))]
 752
 753		print( "\nComputing Block Entropy\n" )
 754
 755		C = am_fast.block_entropy_convergence(
 756			h_mu            = self.h_mu(),
 757			n_states        = len( self.states ),
 758			n_symbols       = len( self.alphabet ),
 759			convergence_tol = 1e-6,
 760			precision       = 10,
 761			eps             = 1e-25,
 762			branches        = branches,
 763			trans           = trs,
 764			max_branches    = 30_000_000
 765		)
 766
 767		print( "Done\n" )
 768
 769		self.set_complexity_measure( f"E",       C.E )
 770		self.set_complexity_measure( f"S",       C.S )
 771		self.set_complexity_measure( f"T_inf",   C.T )
 772		self.set_complexity_measure( f"E_L",     C.E_L.tolist() )
 773		self.set_complexity_measure( f"T_L",     C.T_L.tolist() )
 774		self.set_complexity_measure( f"S_L",     C.S_L.tolist() )
 775		self.set_complexity_measure( f"H_L",     C.H_L.tolist() )
 776		self.set_complexity_measure( f"h_mu_L",  C.h_mu_L.tolist() )
 777		self.set_complexity_measure( f"H_sync",  C.H_sync.tolist() )
 778
 779		return C
 780
 781	#-------------------------------------------------------#
 782
 783	def E( self ) -> float :
 784
 785		"""The *excess entropy* [^crutchfield_exact_2016], p.4:
 786
 787		.. math::
 788
 789			\\mathbf{E} \\equiv \\sum_{L=1}^{\\infty} I[X_{-\\infty:0}; X_{0:\\infty}]
 790		
 791		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
 792
 793		.. note::
 794
 795			**Interpretations**
 796
 797			* The information from the past that reduces uncertainty in the future [^crutchfield_exact_2016].
 798			* How much information an observer must extract to synchronize to the process.
 799			* Measures how long the process appears more complex than it asymptotically is.
 800			* Vanishes for immediately synchronizable processes.
 801
 802		Returns:
 803		
 804			float: :math:`\\mathbf{E}`
 805
 806		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 807			Decomposition of Intrinsic Computation*, 2016.
 808			<https://arxiv.org/abs/1309.3792>
 809		"""
 810
 811		m = self.get_complexity_measure_if_exists( "E" )
 812
 813		if m is not None :
 814			return m
 815
 816		try : 
 817			msp = self.get_msp()
 818			E, S, T = msp.get_E_S_T()
 819			self.set_complexity_measure( "E", E )
 820			self.set_complexity_measure( "S", S )
 821			self.set_complexity_measure( "T_inf", T )
 822			
 823		except Exception as e :
 824
 825			print( f"MSP failed {e}" )
 826
 827			C = self.block_convergence()	
 828			E = C.E
 829			self.set_complexity_measure( "E", E )
 830
 831		return E
 832
 833	#-------------------------------------------------------#
 834
 835	def S( self ) -> float :
 836
 837		"""The *synchronization* information:
 838
 839		.. math::
 840
 841			\\mathbf{S} \\equiv \\sum_{L=1}^{\\infty} \\mathcal{H}(L),
 842
 843		where :math:`\\mathcal{H}(L)` is the average state uncertainty having seen all length-L words [^crutchfield_exact_2016], p.4.
 844
 845		.. note::
 846
 847			**Interpretations**
 848
 849			* The total amount of state information that an observer must extract to become synchronized [^crutchfield_exact_2016].
 850
 851		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
 852
 853		Returns:
 854		
 855			float: :math:`\\mathbf{S}`
 856
 857		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 858			Decomposition of Intrinsic Computation*, 2016.
 859			<https://arxiv.org/abs/1309.3792>
 860		"""
 861
 862		m = self.get_complexity_measure_if_exists( "S" )
 863
 864		if m is not None :
 865			return m
 866
 867		try : 
 868			msp = self.get_msp()
 869			E, S, T = msp.get_E_S_T()
 870			self.set_complexity_measure( "E", E )
 871			self.set_complexity_measure( "S", S )
 872			self.set_complexity_measure( "T_inf", T )
 873
 874		except Exception as e :
 875			print( f"{e} \nFalling back to iterative estimation.")
 876			exit()
 877
 878			C = self.block_convergence()	
 879			S = C.S
 880			self.set_complexity_measure( "S", S )
 881
 882		return S
 883
 884	#-------------------------------------------------------#
 885
 886	def T_inf( self ) -> float :
 887
 888		"""The *transient information*[^crutchfield_exact_2016], p.4:
 889
 890		.. math::
 891
 892			\\mathbf{T} \\equiv \\sum_{L=1}^{\\infty} L \\left[ h_{\\mu}(L) - h_{\\mu} \\right]
 893
 894		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
 895
 896		.. note::
 897
 898			**Interpretations**
 899
 900			* The amount of information one must extract from observations so that the block entropy converges to its linear asymptote[^crutchfield_exact_2016].
 901
 902		Returns:
 903		
 904			float: :math:`\\mathbf{T}`
 905
 906		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 907			Decomposition of Intrinsic Computation*, 2016.
 908			<https://arxiv.org/abs/1309.3792>
 909		"""
 910
 911		m = self.get_complexity_measure_if_exists( "T_inf" )
 912
 913		if m is not None :
 914			return m
 915
 916		try : 
 917			msp = self.get_msp()
 918			E, S, T = msp.get_E_S_T()
 919			self.set_complexity_measure( "E", E )
 920			self.set_complexity_measure( "S", S )
 921			self.set_complexity_measure( "T_inf", T )
 922
 923		except Exception as e :
 924			print( f"{e} \nFalling back to iterative estimation.")
 925			C = self.block_convergence()	
 926			T_inf = C.T
 927			self.set_complexity_measure( "T_inf", T_inf )
 928
 929		return T_inf
 930
 931	#-------------------------------------------------------#
 932
 933	def chi( self ) -> float :
 934
 935		"""The foward crypticity[^crutchfield_crypticity_2009][^Mahoney_crypticity_2021], p.2:
 936
 937		.. math::
 938
 939			\\chi = C_{\\mu} - \\mathbf{E}
 940
 941		:math:`C_{\\mu}` is trivially computed from the stationary distribution in :meth:`C_mu` and :math:`\\mathbf{E}` in :meth:`E`.
 942
 943		.. note::
 944
 945			**Interpretations**
 946
 947			* Difference between internal stored information and apparent information to an observer.
 948			* How muching information is hiding in the system.
 949
 950		Returns:
 951		
 952			float: :math:`\\chi`
 953
 954		[^crutchfield_crypticity_2009]: Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009.
 955			<https://arxiv.org/abs/0902.1209>
 956
 957		[^Mahoney_crypticity_2021]: Mahoney et al., Information Accessibility and Cryptic Processes, 2021.
 958			<https://arxiv.org/abs/0905.4787>
 959		"""
 960
 961		m = self.get_complexity_measure_if_exists( "chi" )
 962
 963		if m is not None :
 964			return m
 965
 966		chi = self.C_mu() - self.E()
 967
 968		if chi < 0 :
 969			
 970			# if chi is 0, accumulated floating point error can result in small negative values
 971			if chi < -1e-5:
 972				warnings.warn(f"Crypticity is negative ({chi:.6e}).")
 973			
 974			chi = np.clamp( chi, 0 )
 975
 976		self.set_complexity_measure( "chi", chi )
 977
 978		return chi
 979
 980	#-------------------------------------------------------#
 981	#                      Properties                       #
 982	#-------------------------------------------------------#
 983
 984	def is_row_stochastic(self) :
 985
 986		"""
 987		Check that all states have outgoing transition probabilities that sum to 1.
 988		"""
 989
 990		sums = np.zeros( len( self.states ) )
 991		for tr in self.transitions :
 992			sums[ tr.origin_state_idx ] += tr.prob
 993		return np.allclose( sums, 1.0 )
 994
 995	#-------------------------------------------------------#
 996
 997	def is_unifilar(self) :
 998
 999		"""
1000		Check that no state emits the same symbol on transitions to different states. 
1001		"""
1002
1003		symbol_trs = np.full( ( len( self.states ), len( self.alphabet) ), -1 )
1004
1005		for tr in self.transitions : 
1006		
1007			if symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] == -1 :
1008				symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] = tr.target_state_idx
1009		
1010			elif symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] != tr.target_state_idx :
1011				return False
1012		
1013		return True
1014
1015	#-------------------------------------------------------#
1016
1017	def is_strongly_connected(self) :
1018
1019		"""
1020		Check if every state is reachable from every other state. Relies on [nx.is_strongly_connected](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.components.is_strongly_connected.html).
1021		"""
1022
1023		return nx.is_strongly_connected( self.as_digraph() )
1024
1025	#-------------------------------------------------------#
1026
1027	def is_aperiodic(self) :
1028
1029		"""
1030		Checks if machine is periodic. Relies on [nx.is_aperiodic](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.dag.is_aperiodic.html), "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph."
1031		"""
1032
1033		return nx.is_aperiodic( self.as_digraph() )
1034
1035	#-------------------------------------------------------#
1036
1037	def _is_minimal_as_dfa( self, topological_only : bool, verbose=True ) :
1038
1039		with_probs = not topological_only
1040
1041		# Construct the DFA
1042		dfa = self.as_dfa( with_probs=with_probs )
1043
1044		# Minimize the DFA
1045		#dfa = dfa.minify(retain_names=True)
1046		dfa = am_fast.minify_cpp( dfa, retain_names=True )
1047
1048		# check we have minimal number of states
1049		if len( dfa.states ) != len( self.states ) :
1050			if verbose : 
1051				print( f"Not minimal reduces from {len( self.states )} to {len( dfa.states )} states" )
1052			return False
1053
1054		return True
1055
1056	def is_topological_epsilon_machine( self, verbose=True ) :
1057
1058		"""
1059		Checks if the HMM is a topological $\\epsilon$-machine [^1].
1060
1061		[^1]: Johnson et al, Enumerating Finitary Processes, 2024.
1062			<https://arxiv.org/abs/1011.0036>
1063		"""
1064
1065		if not ( self.is_unifilar() and self.is_strongly_connected() ) :
1066			if verbose : 
1067				print( f"Either non unifilar or not strongly connected" )
1068			return False
1069		else :
1070			return self._is_minimal_as_dfa( topological_only=True, verbose=verbose )
1071
1072	def is_epsilon_machine( self, verbose=True ) :
1073
1074		if not ( self.is_unifilar() and self.is_strongly_connected() ) :
1075			if verbose : 
1076				print( f"Either non unifilar or not strongly connected" )
1077			return False
1078		else :
1079			return self._is_minimal_as_dfa( topological_only=False, verbose=verbose )
1080
1081	#-------------------------------------------------------#
1082	#                      Properties                       #
1083	#-------------------------------------------------------#
1084
1085	def minimize(self, retain_names: bool = True, verbose=False):
1086
1087		"""
1088		Minimizes the HMM, resulting in an :math:`\\epsilon-`machine if the HMM
1089		is unifilar and strongly connected. Converts the HMM to a DFA with symbols
1090		labeled jointly with symbols and probabilities, and uses Myhill-Nerode 
1091		equivalence for minimization. Relies on `automata_lib` and uses
1092		 `automata.fa.dfa.DFA.minify` with `allow_partial=True`, and all states
1093		 final.
1094
1095		Args:
1096			retain_names (bool): If `True`, the merged states will be named by their union, e.g. `{s_0, s_1}`, and other states will retain their origion names. Otherwise, they will be relabled `{ '0', '1', ..., 'n-1' }`.
1097
1098		Returns:
1099		
1100			automata.fa.dfa.DFA : the resulting DFA.
1101		"""
1102
1103		if self.is_minimal :
1104			return
1105
1106		start = time.perf_counter()
1107
1108		if not self.is_unifilar():
1109			raise ValueError(
1110				"DFA minimization is not valid for non-unifilar HMMs"
1111			)
1112
1113		was_strongly_connected = self.is_strongly_connected()
1114
1115		was_row_stochastic = self.is_row_stochastic()
1116		n_states_before = len(self.states)
1117
1118		dfa = self.as_dfa(with_probs=True)
1119
1120		#min_dfa = self.as_dfa(with_probs=True).minify(retain_names=True)
1121		min_dfa = am_fast.minify_cpp( dfa, retain_names=True )
1122
1123		# Build lookup from original state index -> CausalState object
1124		orig_state   = {i: s for i, s in enumerate(self.states)}
1125		eq_list      = list(min_dfa.states)
1126
1127		start_eq = min_dfa.initial_state
1128		
1129		# Separate the start state, then sort the rest by the 
1130		# smallest original state index inside each equivalence class.
1131		other_eqs = [eq for eq in eq_list if eq != start_eq]
1132		other_eqs.sort(key=lambda eq: min(eq))
1133
1134		# Recombine so start eq comes first, followed by the sorted remaining classes
1135		eq_list = [start_eq] + other_eqs
1136		# ----------------------------------------------------------
1137
1138		# Recompute eq_to_idx with the new ordering
1139		eq_to_idx = {eq: i for i, eq in enumerate(eq_list)}
1140
1141		# new_start is now guaranteed to be 0
1142		new_start = 0
1143
1144		# Map each original state index -> its equivalence class
1145		# Guard: minify() silently drops unreachable states
1146		orig_to_eq = {s: eq for eq in min_dfa.states for s in eq}
1147
1148		# Build lookup from original state index -> its transitions
1149		orig_trs = defaultdict(list)
1150		for t in self.transitions:
1151			orig_trs[t.origin_state_idx].append(t)
1152
1153		new_trs = []
1154		for eq in min_dfa.states:
1155			rep        = next(iter(eq))
1156			origin_idx = eq_to_idx[eq]
1157			for t in orig_trs[rep]:
1158				target_eq  = orig_to_eq[t.target_state_idx]
1159				target_idx = eq_to_idx[target_eq]
1160				new_trs.append(Transition(
1161					origin_state_idx = origin_idx,
1162					target_state_idx = target_idx,
1163					prob             = t.prob,
1164					symbol_idx       = t.symbol_idx,
1165				))
1166
1167		members_list = [[orig_state[i] for i in sorted(eq)] for eq in eq_list]  # sorted for determinism
1168
1169		# Compute new names
1170		if retain_names:
1171			new_names = [
1172				"{" + ",".join(str(m.name) for m in members) + "}" if len(members) > 1
1173				else members[0].name
1174				for members in members_list
1175			]
1176		else:
1177			new_names = [str(j) for j in range(len(eq_list))]
1178
1179		old_name_to_new_name = {
1180			m.name: new_names[j]
1181			for j, members in enumerate(members_list)
1182			for m in members
1183		}
1184
1185		# Build the new states, preserving classes and isomorphs regardless of naming
1186		new_states = []
1187		for j, (eq, members, name) in enumerate(zip(eq_list, members_list, new_names)):
1188			classes   = set().union(*(m.classes for m in members))
1189			isomorphs = {
1190				old_name_to_new_name.get(iso, iso)
1191				for m in members
1192				for iso in m.isomorphs
1193				if old_name_to_new_name.get(iso, iso) != name
1194			}
1195			new_states.append(CausalState(
1196				name      = name,
1197				classes   = classes,
1198				isomorphs = isomorphs,
1199			))
1200
1201		self.set_states(new_states)
1202		self.set_transitions(new_trs)
1203		self.start_state = new_start
1204
1205		if n_states_before == len(new_states) and verbose :
1206			print( f"{n_states_before} state HMM was already minimal.\n" )
1207		elif verbose :
1208			print( f"Minimized from {n_states_before} to {len(new_states)}\n" )
1209
1210		if not ( was_strongly_connected ==  self.is_strongly_connected() ) :
1211			raise RuntimeError(
1212				f"Minimization broke strongly connected"
1213			)
1214
1215		if not ( was_row_stochastic ==  self.is_row_stochastic() ) :
1216			raise RuntimeError(
1217				f"Minimization broke row stochasticity"
1218			)
1219
1220		self.is_minimal = True
1221
1222	#-------------------------------------------------------#
1223	#                      Modifiers                        #
1224	#-------------------------------------------------------#
1225
1226
1227	def collapse_to_largest_strongly_connected_subgraph( self, rename_states=True ) :
1228
1229		# get equivalent networkx graph
1230		G = self.as_digraph()
1231
1232		# if already strongly connected, nothing to do
1233		if not nx.is_strongly_connected( G ) :
1234
1235			start = time.perf_counter()
1236			subgraph_nodes = list( nx.strongly_connected_components( G ) )
1237
1238			# decompose into strongly connected components and sort by length
1239			# subgraph_nodes = list(nx.strongly_connected_components( G ))
1240			subgraph_nodes.sort(key=len)
1241			component_state_set = subgraph_nodes[-1]
1242
1243			# Take the largest strongly connected component (as list of state names)
1244			component_states = sorted( list( component_state_set ) )
1245
1246			# make temporary copies of the old transitions and states
1247			old_transitions = [
1248				Transition(
1249					origin_state_idx=tr.origin_state_idx,
1250					target_state_idx=tr.target_state_idx,
1251					prob=tr.prob,
1252					symbol_idx=tr.symbol_idx
1253				)
1254
1255				for tr in self.transitions
1256			]
1257
1258			old_states = [
1259				CausalState(
1260					name=s.name,
1261					classes=s.classes,
1262					isomorphs=s.isomorphs
1263				)
1264				for s in self.states
1265			]
1266
1267			self.set_states(
1268				states=[ 
1269					state
1270					for i, state in enumerate( old_states ) if i in component_state_set
1271				]
1272			)
1273
1274			# we will build new transition list based on those belonging to the component
1275			self.set_transitions( transitions= [] )
1276
1277			# for tracking which new transitions leave each state
1278			transitions_from_state = { state : set() for state in component_states }
1279			new_transitions = []
1280
1281			for tr in old_transitions :
1282
1283				origin_state_name = old_states[ tr.origin_state_idx ].name
1284				target_state_name = old_states[ tr.target_state_idx ].name
1285
1286				# skip transitions that connect separate strongly connected components
1287				if not ( tr.origin_state_idx in component_state_set and tr.target_state_idx in component_state_set ) :
1288					continue
1289
1290				# track transitions (by index in new transitions list) that leave this state
1291				transitions_from_state[ tr.origin_state_idx ].add( len( new_transitions ) )
1292
1293				my_origin_state_idx = self.state_idx_map[ origin_state_name ]
1294				my_target_state_idx = self.state_idx_map[ target_state_name ]
1295
1296				new_transitions.append( 
1297					Transition(
1298						origin_state_idx=my_origin_state_idx,
1299						target_state_idx=my_target_state_idx,
1300						prob=tr.prob,
1301						symbol_idx=tr.symbol_idx
1302					)  )
1303
1304			self.set_transitions( transitions=new_transitions )
1305
1306			# if we removed an outgoing transition from a state, we need to distribute its probability 
1307			# among the remaining outgoing transitions from the state
1308			for state in component_states :
1309				
1310				# get the set of transitions leaving this state
1311				state_trs = transitions_from_state[ state ]
1312
1313				# sum the probabilities of the outgoing transitions from the state
1314				p_sum = np.sum( [ self.transitions[ i ].prob for i in state_trs ] )
1315
1316				# how much probability is missing
1317				diff = 1.0 - p_sum
1318
1319				# if significant difference
1320				if abs( diff ) > self.EPS : 
1321
1322					# calculate how much of the difference each transition gets
1323					adjustment = diff / len( state_trs )
1324					
1325					# update the transitions
1326					for i in state_trs :
1327
1328						# transition with origional probability
1329						tr = self.transitions[ i ]
1330
1331						# adjusted probability
1332						self.transitions[ i ] = Transition(
1333							origin_state_idx=tr.origin_state_idx, 
1334							target_state_idx=tr.target_state_idx, 
1335							prob=tr.prob + adjustment, 
1336							symbol_idx=tr.symbol_idx )
1337
1338			if rename_states :
1339				self.set_states( [
1340					CausalState( name=f"{i}" )
1341					for i, s in enumerate( self.states )
1342				] )
1343
1344	def to_q_weighted( self, denominator_limit=1000 ) :
1345
1346		"""
1347		Approximates the existing transition probabilities with exact fractions, stores 
1348		the fractional probabilities as Fraction in Transition.pq, and sets the floating
1349		point probabilty to `float(pq)`. If `denominator_limit` is too small for a sane 
1350		conversion, the function recurses with `denominator_limit=denominator_limit*10`.
1351
1352		Args:
1353			denominator_limit (int): The initial input to :meth:`Fraction.limit_denominator` in 
1354			the conversion.
1355		"""
1356
1357		if self.is_q_weighted :
1358			return
1359
1360		if not self.is_row_stochastic() :
1361			raise ValueError( "Cannot convert to q-weighted because not row stochastic" )
1362
1363		t_from = [[] for _ in range(len(self.states))]
1364
1365		for i, tr in enumerate( self.transitions ) :
1366			t_from[ tr.origin_state_idx ].append( i )
1367
1368		new_transitions = []
1369		for t_list in t_from :
1370			
1371			if not t_list : 
1372				continue
1373
1374			p_q_sum = Fraction(0,1)
1375			p_qs = []
1376
1377			for t_idx in t_list :
1378			
1379				p_q = Fraction( self.transitions[ t_idx ].prob ).limit_denominator( denominator_limit )
1380				p_q_sum += p_q
1381				p_qs.append( p_q )
1382
1383			if p_q_sum != Fraction(1,1) :
1384				
1385				max_pq_i = np.argmax( p_qs )
1386				max_oq = p_qs[ max_pq_i ]
1387
1388				diff = p_q_sum - Fraction(1,1)
1389
1390				# If recurse with higher resolution
1391				if diff > max_oq :
1392					return self.to_q_weighted( denominator_limit*10 )
1393				else :
1394					p_qs[ max_pq_i ] -= diff
1395
1396			for i, t_idx in enumerate( t_list ) :	
1397				new_transitions.append( 
1398					Transition( 
1399						origin_state_idx=self.transitions[  t_idx ].origin_state_idx,
1400						target_state_idx=self.transitions[  t_idx ].target_state_idx,
1401						prob=float(p_qs[ i ]),
1402						symbol_idx=self.transitions[  t_idx ].symbol_idx,
1403						pq=p_qs[ i ]
1404					)
1405				)
1406
1407		self.set_transitions( new_transitions )
1408		self.is_q_weighted = True
1409
1410	#-------------------------------------------------------#
1411	#                    Data Generation                    #
1412	#-------------------------------------------------------#
1413
1414	def isomorphic_shift(
1415		self,
1416		input_symbol_indices: np.ndarray,
1417		input_state_indices:  np.ndarray,
1418		shift : int = 1
1419	) -> dict[str, np.ndarray]:
1420
1421		"""
1422		Generates a new sequence of of symbols that are permuted with the symbols emitted by
1423		isomorphic states, if they exists.
1424
1425		:math:`\\sigma_o = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i]\\right]`<br>
1426		:math:`\\sigma_t = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i+1]\\right]`
1427 
1428		:math:`\\mathcal{I}(\\sigma_o) = \\\\{\\sigma^0_o,\\, \\sigma^1_o,\\, \\dots,\\, \\sigma^{n-1}_o \\\\}`<br>
1429		:math:`\\mathcal{I}(\\sigma_t) = \\\\{\\sigma^0_t,\\, \\sigma^1_t,\\, \\dots,\\, \\sigma^{n-1}_t \\\\}`
1430 
1431		:math:`k = \\bigl(\\mathcal{I}(\\sigma_o).\\texttt{index}(\\sigma_o) + \\texttt{shift}\\bigr) \\bmod n`
1432 
1433		:math:`\\texttt{output\\_symbol\\_indices}[i]   := T(\\sigma_o^k,\\, \\sigma_t^k).\\text{symbol\\_index}`<br>
1434		:math:`\\texttt{output\\_state\\_indices}[i]    := \\mathcal{S}.\\texttt{index}(\\sigma_o^k)`<br>
1435		:math:`\\texttt{output\\_state\\_indices}[i+1]  := \\mathcal{S}.\\texttt{index}(\\sigma_t^k)`
1436
1437		Where :math:`\\mathcal{I}(\\sigma)`` is the ordered set of states isomorphic to :math:`\\sigma` including :math:`\\sigma` itself.
1438
1439		Args:
1440			input_symbol_indices (np.ndarray): The sequence of generated symbols.
1441			input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end.
1442			shift : int: How much to shift the symbols across the isomorphic states.
1443		"""
1444
1445		if not any( state.isomorphs for state in self.states ):
1446			raise ValueError("HMM has no states with isomorphs")
1447
1448		inputs = np.asarray(input_symbol_indices)
1449		states = np.asarray(input_state_indices)
1450
1451		n_states = len(self.states)
1452
1453		tr_sym_table = np.full((n_states, n_states), -1, dtype=np.int32)
1454
1455		for tr in self.transitions:
1456			tr_sym_table[ tr.origin_state_idx, tr.target_state_idx ] = tr.symbol_idx
1457
1458		# Build isomorph remapping: identity by default, overridden where isomorphs exist
1459		iso_table = np.arange(n_states, dtype=np.int32)
1460
1461		for i, state in enumerate(self.states):
1462			if state.isomorphs:
1463				isomorph       = sorted(state.isomorphs)[0]
1464				iso_table[ i ] = self.state_idx_map[isomorph]
1465
1466		for i, state in enumerate(self.states):
1467			if state.isomorphs:
1468
1469				# extend the isomorph list to include the identity
1470				isormorphs_with_identity = sorted( [ i ] + [ self.state_idx_map[iso] for iso in state.isomorphs ] )
1471
1472				# find the identity index
1473				pos = isormorphs_with_identity.index( i )
1474
1475				# cyclical shift 
1476				iso_table[ i ] = isormorphs_with_identity[ ( pos + shift ) % len( isormorphs_with_identity ) ]
1477
1478		origins = states[:-1]
1479		targets = states[1:]
1480
1481		out_origins = iso_table[origins]
1482		out_targets = iso_table[targets]
1483
1484		inv_sym = tr_sym_table[ out_origins, out_targets ]
1485		inv_sts = np.empty(states.size, dtype=states.dtype)
1486
1487		inv_sts[:-1] = out_origins
1488		inv_sts[-1]  = out_targets[-1]
1489
1490		return {
1491			"symbol_index": inv_sym.astype(inputs.dtype),
1492			"state_index":  inv_sts,
1493		}
1494
1495	def generate_data(
1496		self,
1497		file_prefix: str,
1498		n_gen: int,
1499		include_states: bool,
1500		isomorphic_shifts : set[int]=None,
1501		random_seed : int=42 ) -> dict[any] : 
1502
1503		trs = [ [] for _ in range( len( self.states ) ) ]
1504		for tr in self.transitions :
1505			trs[ tr.origin_state_idx ].append( ( 
1506				tr.symbol_idx, 
1507				float( tr.prob ),
1508				tr.target_state_idx ) )
1509
1510		data = am_fast.generate_data(
1511			n_gen=n_gen,
1512			start_state=self.start_state,
1513			transitions=trs,
1514			alphabet=sorted(list(self.alphabet)),
1515			include_states=include_states,
1516			random_seed=random_seed
1517		)
1518
1519		if isomorphic_shifts is not None :
1520
1521			if not include_states :
1522				raise ValueError( "Isomorphic inversion requires include_states=True" )
1523
1524			data[ "isomorphic_shifts" ] = {}
1525
1526			for shift in isomorphic_shifts :
1527
1528				try : 
1529
1530					shifted = self.isomorphic_shift(
1531						input_symbol_indices=data[ "symbol_index" ], 
1532						input_state_indices=data[ "state_index" ], 
1533						shift=shift
1534					)
1535
1536					data[ "isomorphic_shifts" ][ shift ] = {
1537						"symbol_index" : shifted[ "symbol_index" ],
1538						"state_index"  : shifted[ "state_index" ]
1539					}
1540
1541				except Exception as e :
1542					print( f"Exception {e}" )
1543
1544		am_fast.save_data(
1545			data=data,
1546			file_prefix=file_prefix,
1547			alphabet=sorted(list(self.alphabet)),
1548			n_states=len( self.states ),
1549			start_state=self.start_state,
1550			random_seed=random_seed,
1551			machine_metadata=self.get_metadata() )
1552
1553		return data
1554
1555	#-------------------------------------------------------#
1556	#                 Basic Visualization                   #
1557	#-------------------------------------------------------#
1558
1559	def draw_block_entropy_curve(self) :
1560		
1561		h_mu = self.h_mu()
1562		E = self.E()
1563
1564		if not "H_L" in self.complexity :
1565			C = self.block_convergence()
1566
1567		H_L = self.complexity[ "H_L" ]
1568		L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64')
1569		y = E + L*h_mu
1570
1571		plt.style.use('seaborn-v0_8-darkgrid') 
1572		fig, ax = plt.subplots(figsize=(10, 6), dpi=120)
1573		colors = plt.cm.tab10.colors 
1574
1575		ax.plot( L, H_L, color='#1E88E5', linewidth=2.0, linestyle='-', label=r'$H(L)$')
1576		ax.plot( L, y, color='#D81B60', linewidth=1.5, linestyle='--', label=r'$\mathbf{E} + h_{\mu}L$')
1577
1578		ax.fill_between(
1579			L, H_L, y, 
1580			color='gray', 
1581			alpha=0.5, 
1582			label=r'Transient information $\mathbf{T}$'
1583		)
1584
1585		ax.set_title('Block Entropy Curve', fontsize=16, fontweight='bold', pad=15)
1586		ax.set_xlabel('L', fontsize=12, labelpad=10)
1587		ax.set_ylabel('Bits', fontsize=12, labelpad=10)
1588
1589		legend = ax.legend(
1590			loc='upper left', 
1591			title_fontsize='11',
1592			fontsize='10',
1593			frameon=True,
1594			facecolor='white',
1595			shadow=True
1596		)
1597
1598		legend.get_title().set_fontweight('bold')
1599		plt.tight_layout() 
1600		plt.show()
1601
1602	def draw_block_measure_curves(self) :
1603		
1604		h_mu = self.h_mu()
1605		E = self.E()
1606
1607		if not "H_L" in self.complexity :
1608			C = self.block_convergence()
1609
1610		H_L = self.complexity[ "H_L" ]
1611		E_L = self.complexity[ "E_L" ]
1612		T_L = self.complexity[ "T_L" ]
1613		S_L = self.complexity[ "S_L" ]
1614		H_sync = self.complexity[ "H_sync" ][1:]
1615		h_mu_L = self.complexity[ "h_mu_L" ]
1616
1617		L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64')
1618		y = E + L*h_mu
1619
1620		plt.style.use('seaborn-v0_8-darkgrid') 
1621		fig, ax = plt.subplots(figsize=(10, 6), dpi=120)
1622		colors = plt.cm.tab10.colors 
1623
1624		ax.plot( L, H_L, color=colors[0], linewidth=2.0, linestyle='-', label=r'$H(L)$')
1625		ax.plot( L, E_L, color=colors[1], linewidth=2.0, linestyle='-', label=r'$\mathbf{E}(L)$')
1626		ax.plot( L, T_L, color=colors[2], linewidth=2.0, linestyle='-', label=r'$\mathbf{T}(L)$')
1627		ax.plot( L, S_L, color=colors[3], linewidth=2.0, linestyle='-', label=r'$\mathbf{S}(L)$')
1628		ax.plot( L, H_sync, color=colors[4], linewidth=2.0, linestyle='-', label=r'$\mathcal{H}(L)$')
1629		ax.plot( L, h_mu_L, color=colors[5], linewidth=2.0, linestyle='-', label=r'$h_{\mu}(L)$')
1630
1631		ax.set_title('Block Measures', fontsize=16, fontweight='bold', pad=15)
1632		ax.set_xlabel('L', fontsize=12, labelpad=10)
1633		ax.set_ylabel('Bits', fontsize=12, labelpad=10)
1634
1635		legend = ax.legend(
1636			loc='upper left', 
1637			title_fontsize='11',
1638			fontsize='10',
1639			frameon=True,
1640			facecolor='white',
1641			shadow=True
1642		)
1643
1644		legend.get_title().set_fontweight('bold')
1645		plt.tight_layout() 
1646		plt.show()
1647
1648	def draw_graph(
1649		self,
1650		engine     : str  = 'dot',
1651		output_dir : Path = Path('.'),
1652		show       : bool = True,
1653		symbols_only : bool = False
1654	) -> None :
1655
1656		"""
1657		Draws the machine using [pygraphiviz](https://pygraphviz.github.io/documentation/stable/) and saves it.
1658
1659		Returns:
1660		
1661			networkx.DiGraph : the resulting graph.
1662		"""
1663
1664		G = self.as_digraph()
1665
1666		subgraphs = None if nx.is_strongly_connected( G ) else list( nx.strongly_connected_components( G ) )
1667		
1668		am_vis.draw_graph( 
1669			self, 
1670			output_dir=output_dir, 
1671			title="am_graph", 
1672			view=show, 
1673			subgraphs=subgraphs, 
1674			engine=engine,
1675			symbols_only=symbols_only )
1676
1677	#-------------------------------------------------------#
1678	#                   Alternate Forms                     #
1679	#-------------------------------------------------------#
1680
1681
1682	def as_digraph( self ) -> nx.DiGraph :
1683
1684		"""
1685		Builds a [networkx.DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html) constructed from the machine's transitions with no edge symbols or weights.
1686
1687		Returns:
1688		
1689			networkx.DiGraph : the resulting graph.
1690		"""
1691
1692		G = nx.DiGraph()
1693		G.add_nodes_from( [ i for i, s in enumerate( self.states ) ] )
1694
1695		for tr in self.transitions :
1696			G.add_edge( tr.origin_state_idx, tr.target_state_idx )
1697
1698		return G
1699
1700	def as_dfa( self, with_probs : bool ) :
1701
1702		"""
1703		Builds an [automata.fa.dfa.DFA](https://caleb531.github.io/automata/api/fa/class-dfa/) constructed from the machine's transitions.
1704
1705		Args:
1706			with_probs (bool): If true the DFA transitions are labeled based on
1707				the symbol of the machines transition concatenated with its
1708				probability, othwise, the only the symbols.
1709
1710		Returns:
1711		
1712			automata.fa.dfa.DFA : the resulting DFA.
1713		"""
1714
1715		precision=8
1716
1717		def edge_label( symb, prob ) :
1718			return f"({symb},{round(prob, precision)})"
1719
1720		# Build states, symbols, and transitions 
1721		dfa_states  = { i for i, _ in enumerate( self.states ) }
1722			
1723		if not with_probs :
1724			dfa_symbols = set( { t.symbol_idx for t in self.transitions } )
1725		else : 
1726			dfa_symbols = set( { edge_label( t.symbol_idx, t.prob ) for t in self.transitions } )
1727
1728		dfa_transitions = defaultdict(dict)
1729
1730		if not with_probs :
1731			for t in self.transitions :
1732				dfa_transitions[ t.origin_state_idx ][ t.symbol_idx ] = t.target_state_idx
1733		else :
1734			for t in self.transitions :
1735				dfa_transitions[ t.origin_state_idx ][ edge_label( t.symbol_idx, t.prob ) ] = t.target_state_idx
1736
1737		# Construct the DFA
1738		return DFA(
1739			states=dfa_states,
1740			input_symbols=dfa_symbols,
1741			transitions=dfa_transitions,
1742			initial_state=self.start_state,
1743			allow_partial=True,
1744			final_states={ 
1745				s for s in dfa_states
1746			}
1747		)
class HMM(amachine.am_machine.Machine):
  40class HMM(Machine) :
  41
  42	"""Hidden Markov model implementing epsilon machines, mixed state presentations,
  43	complexity measures, and data generation.
  44
  45	Args:
  46		states (list[CausalState] | None): A list of causal states.
  47		transitions (list[Transition] | None): A list of transitions between states.
  48		start_state (int): Index of the start state.
  49		alphabet (list[str]): List of symbols making up the alphabet.
  50		name (str): Name of the model.
  51		description (str): Description of the model.
  52	"""
  53
  54	def __init__( 
  55		self,
  56		states : list[CausalState] | None = None,
  57		transitions : list[Transition] | None = None,
  58		start_state : int = 0,
  59		alphabet : list[str] | None = None,
  60		name : str = "",
  61		description : str = "" ) : 
  62
  63		self.alphabet    : list[str] | None = alphabet or []
  64		self.states      : list[CausalState] | None= states or []
  65		self.transitions : list[Transition] | None= transitions or []
  66
  67		self.set_alphabet( self.alphabet )
  68		self.set_states( self.states )
  69		self.set_transitions( self.transitions )
  70
  71		self.start_state : int = start_state
  72
  73		self.name : str = name
  74		self.description : str = description
  75
  76		# To be depreciated
  77		self.isoclass : str = None
  78
  79		# --- derived --------
  80
  81		self.complexity : dict[str, any] = {}
  82
  83		self.symbol_idx_map : dict[str,int] = {}
  84		self.state_idx_map  : dict[str,int] = {}
  85
  86		self.pi_fractional = None
  87		self.pi : np.ndarray | None = None
  88		self.T  : np.ndarray | None = None
  89		self.msp : MSP | None = None
  90		self.reverse_am : HMM | None = None
  91		self.is_q_weighted : bool = False
  92		self.is_minimal : bool = False
  93
  94		# --- const ----------
  95
  96		self.EPS : float = 1e-12
  97
  98	#-------------------------------------------------------#
  99	#                Overrides / Getters                    #
 100	#-------------------------------------------------------#
 101
 102	@override
 103	def get_states(self) -> list[CausalState] : 
 104		return self.states
 105
 106	@override
 107	def get_transitions(self) -> list[Transition] : 
 108		return self.transitions
 109
 110	@override
 111	def get_alphabet(self) -> list[Symbol]:
 112		return [ Symbol(a) for a in self.alphabet ]
 113
 114	#-------------------------------------------------------#
 115	#                     Serialization                     #
 116	#-------------------------------------------------------#
 117
 118
 119	def to_dict(self) -> dict[str,any]:
 120
 121		"""Create a dict representing the HMM configuration.
 122
 123		Returns:
 124
 125			dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass.
 126		"""
 127
 128		return {
 129			"name"            : self.name,
 130			"description"     : self.description,
 131			"states"          : [ asdict(state)      for state      in self.states      ],
 132			"transitions"     : [ asdict(transition) for transition in self.transitions ],
 133			"alphabet"        : self.alphabet,
 134			"isoclass"        : self.isoclass
 135		}
 136
 137	def from_dict( self, config : dict[str,any] ) :
 138
 139		"""Configure the HMM configuration from a dictionary.
 140
 141		Args:
 142			config (dict[str,any]): The HMM configuration.
 143		"""
 144
 145		self.name            = config.name 
 146		self.description     = config.description 
 147		
 148		self.set_states( states=[ 
 149			CausalState( 
 150				name=state[ "name" ],
 151				classes=set( state[ "classes" ] )
 152			)
 153			for state in config.states
 154		] )
 155
 156		self.set_transitions( transitions=[ 
 157			Transition( 
 158				origin_state_idx=tr[ "origin_state_idx" ],
 159				target_state_idx=tr[ "target_state_idx" ],
 160				prob=tr[ "prob" ],
 161				symbol_idx=tr[ "symbol_idx" ]
 162			)
 163			for tr in config.transitions
 164		] )
 165
 166		self.set_alphabet( alphabet=config.alphabet ) 
 167
 168	def save_config(
 169		self, 
 170		path : Path, 
 171		with_complexity : bool = False, 
 172		with_non_trivial_complexity : bool = False ) :
 173
 174		config = self.to_dict()
 175
 176		if with_complexity :
 177			
 178			complexity = self.get_complexities( 
 179				with_non_trivial=with_non_trivial_complexity
 180			)
 181
 182			config[ "complexity" ] = complexity
 183
 184		config[ "structural_properties" ] = {
 185			"unifilar"              : self.is_unifilar(),
 186			"row_stochastic"        : self.is_row_stochastic(),
 187			"strongly_connected"    : self.is_strongly_connected(),
 188			"aperiodic"             : self.is_aperiodic(),
 189			"minimal"               : self._is_minimal_as_dfa( topological_only=False ),
 190			"topologically_minimal" : self._is_minimal_as_dfa( topological_only=True ),
 191			"is_epsilon_machine"    : self.is_epsilon_machine()
 192		}
 193
 194		with open( path / "am_config.json", "w", encoding="utf-8" ) as f :
 195			json.dump( config, f, ensure_ascii=False, indent=2, default=list )
 196
 197	def from_file( self, path : Path ) :
 198		with open( Path / "am_config.json", "r" ) as f:
 199			config = json.load(f)
 200		self.from_dict()
 201
 202	#-------------------------------------------------------#
 203	#             Setters and State Management              #
 204	#-------------------------------------------------------#
 205	
 206	def clear_cache(self) :
 207
 208		"""
 209		Reset all derived properties so they will be recomputed when requested later.
 210		"""
 211
 212		self.complexity = {}
 213		self.T = None
 214		self.T_x = None
 215		self.pi = None
 216		self.pi_fractional = None
 217		self.msp = None 
 218		self.reverse_am = None
 219		self.is_q_weighted = False
 220		self.is_minimal = False
 221
 222	def set_states( self, states : list[CausalState] ) :
 223		self.clear_cache()
 224		self.states = states.copy()
 225		self.state_idx_map = {}
 226		for idx, state in enumerate( self.states ) :
 227			self.state_idx_map[ state.name ] = idx
 228
 229	def set_alphabet( self, alphabet : list[str] ) :
 230
 231		self.clear_cache()
 232
 233		old_alphabet = self.alphabet.copy()
 234
 235		aSet = set()
 236		aSet.update( alphabet )
 237
 238		self.alphabet = sorted(list(aSet))
 239
 240		self.symbol_idx_map = {}
 241		for idx, symbol in enumerate( self.alphabet ) :
 242			self.symbol_idx_map[ symbol ] = idx
 243
 244		for i, tr in enumerate( self.transitions ) :
 245			symbol = old_alphabet[ tr.symbol_idx ]
 246			self.transitions[ i ] = Transition(
 247				origin_state_idx=tr.origin_state_idx,
 248				target_state_idx=tr.target_state_idx,
 249				prob=tr.prob,
 250				symbol_idx=self.symbol_idx_map[ symbol ]
 251			)
 252
 253	def set_transitions( self, transitions : list[Transition] ) :
 254		self.clear_cache()
 255		self.transitions = transitions.copy()
 256
 257	def extend_states( self, states : list[CausalState] ) :
 258		self.set_states( self.states + states )
 259
 260	def extend_alphabet( self, alphabet : list[str] ) :
 261		self.set_alphabet( self.alphabet + alphabet  )
 262
 263	def extend_transitions( self, transitions : list[Transition] ) :
 264		self.set_transitions( self.transitions + transitions  )
 265
 266	def get_complexity_measure_if_exists(self, measure ) :
 267		m = self.complexity.get( measure, None )
 268		return m
 269
 270	def set_complexity_measure(self, measure, value ) :
 271		self.complexity[ measure ] = value
 272
 273	#-------------------------------------------------------#
 274	#                   Get and Compute                     #
 275	#-------------------------------------------------------#
 276
 277	def get_complexities( 
 278		self, 
 279		with_non_trivial=False ) :
 280
 281		trivial = [
 282			self.C_mu,
 283			self.h_mu,
 284			self.H_1,
 285			self.rho_mu
 286		]
 287
 288		non_trivial = [
 289			self.E, 
 290			self.T_inf,
 291			self.S,
 292			self.chi
 293		]
 294			
 295		complexities = { m.__name__ : m() for m in trivial }
 296
 297		if with_non_trivial :
 298
 299			complexities |= { m.__name__ : m() for m in non_trivial }
 300
 301			for key in [ 'H_L', 'h_mu_L', 'H_sync' ] :
 302				if key in self.complexity :
 303					complexities[ key ] = self.complexity[ key ]
 304
 305		return complexities
 306
 307	#-------------------------------------------------------#
 308
 309	def get_metadata(self) :
 310		return {
 311			"name" : self.name,
 312			'complexity' : self.complexity,
 313			"description" : self.description
 314		}
 315
 316	def get_transition_matrix(self) :
 317
 318		if self.T  is not None :
 319			return self.T
 320
 321		n_states = len( self.states )
 322		T = np.zeros((n_states, n_states))
 323
 324		for tr in self.transitions :    
 325			T[ tr.origin_state_idx, tr.target_state_idx  ] = tr.prob
 326
 327		self.T = T
 328
 329		return self.T
 330
 331	#-------------------------------------------------------#
 332
 333	def get_T_X(self) :
 334
 335		if self.T_x  is not None :
 336			return self.T_x
 337
 338		n_states  = len( self.states )
 339		n_symbols = len( self.alphabet )
 340
 341		T_x = [ np.zeros((n_states, n_states)) for _ in range( n_symbols ) ]
 342
 343		for tr in self.transitions :
 344			T_x[ tr.symbol_idx ][tr.origin_state_idx, tr.target_state_idx] = tr.prob
 345
 346		self.T_x = T_x
 347		return self.T_x
 348
 349	#-------------------------------------------------------#
 350
 351	def get_msp_qw(
 352		self,
 353		exact_state_cap: int = 1000,
 354		verbose: bool = True,
 355	):
 356		if self.msp is not None:
 357			return self.msp
 358
 359		try : 
 360
 361			print( "\nTrying to Compute Mixed State Presentation using Exact Fractions\n" )
 362
 363			self.msp = compute_msp_exact(
 364				T_x=self.get_Tx_fractional(),
 365				pi=self.get_fractional_stationary_distribution(),
 366				n_states=len(self.states),
 367				alphabet=self.alphabet,
 368				exact_state_cap=1000,
 369				bool = True
 370			)
 371
 372			return self.msp 
 373
 374		except RuntimeError as e :
 375			warnings.warn( f"Exact msp failed: {e} Falling back to msp approximation." )
 376
 377		return self.get_msp()
 378
 379	def get_msp(
 380		self,
 381		exact_state_cap: int = 175_000,
 382		jsd_eps:         float = 1e-7,
 383		k_ann:           int   = 50,
 384		verbose                = True,
 385	) :
 386
 387		if self.msp is not None:
 388			return self.msp
 389	 
 390		T_x = self.get_T_X()
 391		pi  = self.get_stationary_distribution()
 392
 393		T_stacked      = np.stack(T_x)
 394		n_symbols      = len(self.alphabet)
 395		n_input_states = T_stacked.shape[1]
 396	
 397		print( "\nComputing Mixed State Presentation..." )
 398
 399		self.msp = compute_msp( 
 400			T_x=T_x,
 401			pi=pi,
 402			n_states=len(self.states),
 403			alphabet=self.alphabet,
 404			exact_state_cap=exact_state_cap,
 405			verbose=verbose
 406		)
 407
 408		return self.msp
 409
 410	def get_reverse_am(self) :
 411
 412		if self.reverse_am is not None:
 413			return self.reverse_am
 414
 415		pi = self.get_stationary_distribution()
 416		self.reverse_am = copy.deepcopy(self)
 417		
 418		new_transitions = []
 419		for tr in self.transitions:
 420			i = tr.target_state_idx
 421			j = tr.origin_state_idx
 422			
 423			p_reversed = (pi[j] * tr.prob) / pi[i]
 424			
 425			new_transitions.append(
 426				Transition(
 427					origin_state_idx=i,
 428					target_state_idx=j,
 429					prob=p_reversed,
 430					symbol_idx=tr.symbol_idx
 431				)
 432			)
 433
 434		self.reverse_am.set_transitions(new_transitions)
 435
 436		if self.reverse_am.is_epsilon_machine():
 437			return self.reverse_am
 438
 439		rmsp = self.reverse_am.get_msp_qw( exact_state_cap=len(self.states)*4 )
 440
 441		self.reverse_am.set_states( rmsp.states )
 442		self.reverse_am.set_transitions( rmsp.transitions )
 443		self.reverse_am.msp = rmsp
 444		self.reverse_am.start_state = 0
 445
 446		self.reverse_am.collapse_to_largest_strongly_connected_subgraph()
 447		self.reverse_am.minimize()
 448
 449		return self.reverse_am
 450
 451	#-------------------------------------------------------#
 452
 453	def get_Tx_fractional(self) -> list[ list[ list[ Fraction ] ] ] :
 454
 455		self.to_q_weighted()
 456
 457		n_states  = len( self.states )
 458		n_symbols = len( self.alphabet )
 459
 460		T_x = []
 461
 462		for x in range( n_symbols ) :
 463			T_x.append( [] )
 464			for i in range( n_states ) :
 465				T_x[ x ].append( [ 0 for _ in range( n_states ) ] )
 466
 467		for tr in self.transitions :
 468			T_x[ tr.symbol_idx ][ tr.origin_state_idx ][ tr.target_state_idx ] = tr.pq
 469
 470		return T_x
 471
 472	def get_T_sympy( self ) :
 473
 474		self.to_q_weighted()
 475
 476		n = len( self.states )
 477		T = sympy.zeros( n, n )
 478
 479		for tr in self.transitions :
 480			T[ tr.origin_state_idx, tr.target_state_idx ] = tr.pq
 481
 482		return T
 483
 484	def get_fractional_stationary_distribution(self) :
 485
 486		T = self.get_T_sympy()
 487
 488		if self.pi_fractional is not None :
 489			return self.pi_fractional
 490
 491		G = self.as_digraph()
 492
 493		if not nx.is_strongly_connected(G):
 494			raise ValueError( "Single stationary distribution requires strongly connected HMM." )
 495
 496		self.pi_fractional = solve_for_pi_fractional( T )
 497
 498		return self.pi_fractional
 499
 500	def get_stationary_distribution(self):
 501
 502		if self.pi is not None :
 503			return self.pi
 504
 505		G = self.as_digraph()
 506		
 507		if not nx.is_strongly_connected(G):
 508			raise ValueError( "Single stationary distribution requires strongly connected HMM." )
 509
 510		T = self.get_transition_matrix()
 511		return solve_for_pi( T )		
 512
 513	#-------------------------------------------------------#
 514	#                 Complexity Measures                   #
 515	#-------------------------------------------------------#
 516	
 517	def C_mu( self ) :
 518
 519		"""The *statistical complexity* (aka *forecasting complexity*) :
 520
 521		.. math::
 522
 523			C_{\\mu} = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\log_2 \\Pr(\\sigma),
 524
 525		where :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2.
 526
 527		.. note::
 528
 529			**Interpretations**
 530
 531			* The amount of historical information a process stores.
 532			* The amount of structure in a process.
 533
 534		Returns:
 535
 536			float: :math:`C_{\\mu}`.
 537
 538		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 539			Decomposition of Intrinsic Computation*, 2016.
 540			<https://arxiv.org/abs/1309.3792>
 541		"""
 542
 543		m = self.get_complexity_measure_if_exists( "C_mu" )
 544
 545		if m is not None :
 546			return m
 547
 548		pi = self.get_stationary_distribution()
 549
 550		h = 0
 551		for i, pr in enumerate( pi ) :
 552			
 553			if pr < self.EPS :
 554				continue
 555
 556			h += -pr * np.log2( pr )
 557
 558		self.set_complexity_measure( "C_mu", h )
 559
 560		return h
 561
 562	#-------------------------------------------------------#
 563
 564	def h_mu( self ) :
 565
 566		"""The *entropy rate* :
 567
 568		.. math::
 569
 570			h_{\\mu}(\\boldsymbol{\\mathcal{S}}) = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\sum_{x \\in \\mathcal{A}} \\Pr(x|\\sigma) \\log_2 \\Pr(x|\\sigma),
 571
 572		where :math:`\\mathcal{A}` is the alphabet and :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2.
 573
 574		.. note::
 575
 576			**Interpretations**
 577
 578			* The lower bound on achievable loss in bits. 
 579			* The irreducable randomness in the process.
 580			* The intrinsic Randomness in the process.
 581
 582		Returns:
 583			
 584			float: :math:`h_{\\mu}`.
 585
 586		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 587			Decomposition of Intrinsic Computation*, 2016.
 588			<https://arxiv.org/abs/1309.3792>
 589		"""
 590
 591		m = self.get_complexity_measure_if_exists( "h_mu" )
 592
 593		if m is not None :
 594			return m
 595
 596		T  = self.get_transition_matrix()
 597		pi = self.get_stationary_distribution()
 598
 599		n_states = pi.size
 600
 601		h = 0
 602		for i, pr in enumerate( pi ) :
 603
 604			if pr < self.EPS :
 605				continue
 606
 607			row_entropy = 0
 608			for j in range( len( pi ) ) :
 609
 610				if T[ i, j ]  < self.EPS :
 611					continue
 612
 613				row_entropy -= T[ i, j ] * np.log2( T[ i, j ] )
 614
 615			h += pr * row_entropy
 616
 617		self.set_complexity_measure( "h_mu", h )
 618
 619		return h
 620
 621	#-------------------------------------------------------#
 622
 623	def H_1(self) -> float :
 624
 625		"""The *single symbol uncertainty*:
 626
 627		.. math::
 628
 629			H(1)=-\\sum_{x\\in\\mathcal{A}} \\Pr(x) \\log_2{\\Pr(x)},
 630
 631		where :math:`\\mathcal{A}` is the alphabet [^James_2018], p.2.
 632
 633		.. note::
 634
 635			**Interpretations**
 636
 637			* How uncertain you are on average about a single measurement with no context.
 638
 639		Returns:
 640
 641			float: :math:`H(1)`.
 642
 643		[^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018.
 644			<https://arxiv.org/abs/1105.2988>
 645		"""
 646
 647		m = self.get_complexity_measure_if_exists("H_1")
 648		if m is not None:
 649			return m
 650
 651		pi  = self.get_stationary_distribution()
 652		T_X = self.get_T_X()  # dict: symbol -> matrix
 653
 654		h = 0.0
 655		for T_x in T_X:
 656			# Pr(x) = sum_i pi[i] * sum_j T^(x)[i,j]
 657			p_sym = 0.0
 658			for i, pr in enumerate(pi):
 659				if pr < self.EPS:
 660					continue
 661				p_sym += pr * T_x[i, :].sum()
 662
 663			if p_sym < self.EPS:
 664				continue
 665			h -= p_sym * np.log2(p_sym)
 666
 667		self.set_complexity_measure("H_1", h)
 668		return h
 669
 670	#-------------------------------------------------------#
 671
 672	def rho_mu(self) -> float :
 673		
 674		"""The *anticipated information* [^James_2018], p.3.:
 675
 676		.. math::
 677
 678			\\rho_{\\mu}= H(1) - h_{\\mu}
 679
 680		Returns:
 681			
 682			float: :math:`\\rho_{\\mu}`
 683
 684		[^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018.
 685			<https://arxiv.org/abs/1105.2988>
 686		"""
 687
 688		m = self.get_complexity_measure_if_exists("rho_mu")
 689		
 690		if m is not None:
 691			return m
 692
 693		rho = self.H_1() - self.h_mu()
 694		
 695		self.set_complexity_measure("rho_mu", rho)
 696		
 697		return rho
 698
 699	#-------------------------------------------------------#
 700
 701	def block_convergence( self )  :
 702
 703		"""
 704		Run [block entropy convergence](am_fast.html#block_entropy_convergence). Estimates [$\\mathbf{E}$](am_hmm.html#HMM.E), [$\\mathbf{S}$](am_hmm.html#HMM.S), [$\\mathbf{T}$](am_hmm.html#HMM.T_inf), and block measures[^crutchfield_exact_2016]:
 705
 706		- $\\mathbf{E}(L) = H(L) - L \\cdot h_{\\mu}$, 
 707
 708		- $\\mathbf{T}(L) = \\sum_{l=1}^{L} l \\left[ h_{\\mu}(l) - h_{\\mu} \\right]$, 
 709
 710		- $\\mathbf{S}(L) = \\sum_{l=0}^{L} \\mathcal{H}(l)$, 
 711
 712		- $H(L) = H[X_{0:L}]$, 
 713
 714		- $h_{\\mu}(L) = H(L) - H(L-1)$, and 
 715
 716		- $\\mathcal{H}(L) = -\\sum_{w \\in \\mathcal{A}^L} Pr(w) \\sum_{\\sigma \\in \\mathcal{S}} Pr(\\sigma|w) \\log_2 Pr(\\sigma|w)$.
 717
 718		You can plot these curves, and the block entropy curves using, [`amachine.HMM.draw_block_measure_curves`](am_hmm.html#HMM.draw_block_measure_curves), and [`amachine.HMM.draw_block_entropy_curve`](am_hmm.html#HMM.draw_block_entropy_curve). 
 719
 720		<img src="../resources/curves.png" alt="block measures plots" style="width: 100%; margin-left: 0%;">
 721
 722		Returns:
 723		
 724			ComplexityMeasures: An object containing the estimated measures with the following attributes:
 725			
 726			- E (float): The excess entropy.
 727			- T_inf (float): The transient information ($\\mathbf{T}$).
 728			- S (float): The synchronization information.
 729			- E_L (numpy.ndarray): The block excess entropy ($\\mathbf{E}(L)$).
 730			- T_L (numpy.ndarray): The block transient information ($\\mathbf{T}(L)$).
 731			- S_L (numpy.ndarray): The block synchronization information ($\\mathbf{S}(L)$).
 732			- H_L (numpy.ndarray): The block entropy ($H(L)$).
 733			- h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\\mu}(L)$).
 734			- H_sync (numpy.ndarray): The state-block synchronization ($\\mathcal{H}(L)$).
 735			- converged (bool): True if the algorithm converged.
 736
 737		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 738			Decomposition of Intrinsic Computation*, 2016.
 739			<https://arxiv.org/abs/1309.3792>
 740		"""
 741
 742		trs = [ [] for _ in range( len( self.states ) ) ]
 743		for tr in self.transitions :
 744			trs[ tr.origin_state_idx ].append( ( 
 745				tr.symbol_idx, 
 746				float( tr.prob ),
 747				tr.target_state_idx ) )
 748
 749		pi = self.get_stationary_distribution()
 750
 751		state_dist = [ float( pi[ i ] ) for i in range( len( self.states ) ) ]
 752		branches = [(1.0, list(state_dist))]
 753
 754		print( "\nComputing Block Entropy\n" )
 755
 756		C = am_fast.block_entropy_convergence(
 757			h_mu            = self.h_mu(),
 758			n_states        = len( self.states ),
 759			n_symbols       = len( self.alphabet ),
 760			convergence_tol = 1e-6,
 761			precision       = 10,
 762			eps             = 1e-25,
 763			branches        = branches,
 764			trans           = trs,
 765			max_branches    = 30_000_000
 766		)
 767
 768		print( "Done\n" )
 769
 770		self.set_complexity_measure( f"E",       C.E )
 771		self.set_complexity_measure( f"S",       C.S )
 772		self.set_complexity_measure( f"T_inf",   C.T )
 773		self.set_complexity_measure( f"E_L",     C.E_L.tolist() )
 774		self.set_complexity_measure( f"T_L",     C.T_L.tolist() )
 775		self.set_complexity_measure( f"S_L",     C.S_L.tolist() )
 776		self.set_complexity_measure( f"H_L",     C.H_L.tolist() )
 777		self.set_complexity_measure( f"h_mu_L",  C.h_mu_L.tolist() )
 778		self.set_complexity_measure( f"H_sync",  C.H_sync.tolist() )
 779
 780		return C
 781
 782	#-------------------------------------------------------#
 783
 784	def E( self ) -> float :
 785
 786		"""The *excess entropy* [^crutchfield_exact_2016], p.4:
 787
 788		.. math::
 789
 790			\\mathbf{E} \\equiv \\sum_{L=1}^{\\infty} I[X_{-\\infty:0}; X_{0:\\infty}]
 791		
 792		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
 793
 794		.. note::
 795
 796			**Interpretations**
 797
 798			* The information from the past that reduces uncertainty in the future [^crutchfield_exact_2016].
 799			* How much information an observer must extract to synchronize to the process.
 800			* Measures how long the process appears more complex than it asymptotically is.
 801			* Vanishes for immediately synchronizable processes.
 802
 803		Returns:
 804		
 805			float: :math:`\\mathbf{E}`
 806
 807		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 808			Decomposition of Intrinsic Computation*, 2016.
 809			<https://arxiv.org/abs/1309.3792>
 810		"""
 811
 812		m = self.get_complexity_measure_if_exists( "E" )
 813
 814		if m is not None :
 815			return m
 816
 817		try : 
 818			msp = self.get_msp()
 819			E, S, T = msp.get_E_S_T()
 820			self.set_complexity_measure( "E", E )
 821			self.set_complexity_measure( "S", S )
 822			self.set_complexity_measure( "T_inf", T )
 823			
 824		except Exception as e :
 825
 826			print( f"MSP failed {e}" )
 827
 828			C = self.block_convergence()	
 829			E = C.E
 830			self.set_complexity_measure( "E", E )
 831
 832		return E
 833
 834	#-------------------------------------------------------#
 835
 836	def S( self ) -> float :
 837
 838		"""The *synchronization* information:
 839
 840		.. math::
 841
 842			\\mathbf{S} \\equiv \\sum_{L=1}^{\\infty} \\mathcal{H}(L),
 843
 844		where :math:`\\mathcal{H}(L)` is the average state uncertainty having seen all length-L words [^crutchfield_exact_2016], p.4.
 845
 846		.. note::
 847
 848			**Interpretations**
 849
 850			* The total amount of state information that an observer must extract to become synchronized [^crutchfield_exact_2016].
 851
 852		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
 853
 854		Returns:
 855		
 856			float: :math:`\\mathbf{S}`
 857
 858		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 859			Decomposition of Intrinsic Computation*, 2016.
 860			<https://arxiv.org/abs/1309.3792>
 861		"""
 862
 863		m = self.get_complexity_measure_if_exists( "S" )
 864
 865		if m is not None :
 866			return m
 867
 868		try : 
 869			msp = self.get_msp()
 870			E, S, T = msp.get_E_S_T()
 871			self.set_complexity_measure( "E", E )
 872			self.set_complexity_measure( "S", S )
 873			self.set_complexity_measure( "T_inf", T )
 874
 875		except Exception as e :
 876			print( f"{e} \nFalling back to iterative estimation.")
 877			exit()
 878
 879			C = self.block_convergence()	
 880			S = C.S
 881			self.set_complexity_measure( "S", S )
 882
 883		return S
 884
 885	#-------------------------------------------------------#
 886
 887	def T_inf( self ) -> float :
 888
 889		"""The *transient information*[^crutchfield_exact_2016], p.4:
 890
 891		.. math::
 892
 893			\\mathbf{T} \\equiv \\sum_{L=1}^{\\infty} L \\left[ h_{\\mu}(L) - h_{\\mu} \\right]
 894
 895		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
 896
 897		.. note::
 898
 899			**Interpretations**
 900
 901			* The amount of information one must extract from observations so that the block entropy converges to its linear asymptote[^crutchfield_exact_2016].
 902
 903		Returns:
 904		
 905			float: :math:`\\mathbf{T}`
 906
 907		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
 908			Decomposition of Intrinsic Computation*, 2016.
 909			<https://arxiv.org/abs/1309.3792>
 910		"""
 911
 912		m = self.get_complexity_measure_if_exists( "T_inf" )
 913
 914		if m is not None :
 915			return m
 916
 917		try : 
 918			msp = self.get_msp()
 919			E, S, T = msp.get_E_S_T()
 920			self.set_complexity_measure( "E", E )
 921			self.set_complexity_measure( "S", S )
 922			self.set_complexity_measure( "T_inf", T )
 923
 924		except Exception as e :
 925			print( f"{e} \nFalling back to iterative estimation.")
 926			C = self.block_convergence()	
 927			T_inf = C.T
 928			self.set_complexity_measure( "T_inf", T_inf )
 929
 930		return T_inf
 931
 932	#-------------------------------------------------------#
 933
 934	def chi( self ) -> float :
 935
 936		"""The foward crypticity[^crutchfield_crypticity_2009][^Mahoney_crypticity_2021], p.2:
 937
 938		.. math::
 939
 940			\\chi = C_{\\mu} - \\mathbf{E}
 941
 942		:math:`C_{\\mu}` is trivially computed from the stationary distribution in :meth:`C_mu` and :math:`\\mathbf{E}` in :meth:`E`.
 943
 944		.. note::
 945
 946			**Interpretations**
 947
 948			* Difference between internal stored information and apparent information to an observer.
 949			* How muching information is hiding in the system.
 950
 951		Returns:
 952		
 953			float: :math:`\\chi`
 954
 955		[^crutchfield_crypticity_2009]: Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009.
 956			<https://arxiv.org/abs/0902.1209>
 957
 958		[^Mahoney_crypticity_2021]: Mahoney et al., Information Accessibility and Cryptic Processes, 2021.
 959			<https://arxiv.org/abs/0905.4787>
 960		"""
 961
 962		m = self.get_complexity_measure_if_exists( "chi" )
 963
 964		if m is not None :
 965			return m
 966
 967		chi = self.C_mu() - self.E()
 968
 969		if chi < 0 :
 970			
 971			# if chi is 0, accumulated floating point error can result in small negative values
 972			if chi < -1e-5:
 973				warnings.warn(f"Crypticity is negative ({chi:.6e}).")
 974			
 975			chi = np.clamp( chi, 0 )
 976
 977		self.set_complexity_measure( "chi", chi )
 978
 979		return chi
 980
 981	#-------------------------------------------------------#
 982	#                      Properties                       #
 983	#-------------------------------------------------------#
 984
 985	def is_row_stochastic(self) :
 986
 987		"""
 988		Check that all states have outgoing transition probabilities that sum to 1.
 989		"""
 990
 991		sums = np.zeros( len( self.states ) )
 992		for tr in self.transitions :
 993			sums[ tr.origin_state_idx ] += tr.prob
 994		return np.allclose( sums, 1.0 )
 995
 996	#-------------------------------------------------------#
 997
 998	def is_unifilar(self) :
 999
1000		"""
1001		Check that no state emits the same symbol on transitions to different states. 
1002		"""
1003
1004		symbol_trs = np.full( ( len( self.states ), len( self.alphabet) ), -1 )
1005
1006		for tr in self.transitions : 
1007		
1008			if symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] == -1 :
1009				symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] = tr.target_state_idx
1010		
1011			elif symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] != tr.target_state_idx :
1012				return False
1013		
1014		return True
1015
1016	#-------------------------------------------------------#
1017
1018	def is_strongly_connected(self) :
1019
1020		"""
1021		Check if every state is reachable from every other state. Relies on [nx.is_strongly_connected](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.components.is_strongly_connected.html).
1022		"""
1023
1024		return nx.is_strongly_connected( self.as_digraph() )
1025
1026	#-------------------------------------------------------#
1027
1028	def is_aperiodic(self) :
1029
1030		"""
1031		Checks if machine is periodic. Relies on [nx.is_aperiodic](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.dag.is_aperiodic.html), "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph."
1032		"""
1033
1034		return nx.is_aperiodic( self.as_digraph() )
1035
1036	#-------------------------------------------------------#
1037
1038	def _is_minimal_as_dfa( self, topological_only : bool, verbose=True ) :
1039
1040		with_probs = not topological_only
1041
1042		# Construct the DFA
1043		dfa = self.as_dfa( with_probs=with_probs )
1044
1045		# Minimize the DFA
1046		#dfa = dfa.minify(retain_names=True)
1047		dfa = am_fast.minify_cpp( dfa, retain_names=True )
1048
1049		# check we have minimal number of states
1050		if len( dfa.states ) != len( self.states ) :
1051			if verbose : 
1052				print( f"Not minimal reduces from {len( self.states )} to {len( dfa.states )} states" )
1053			return False
1054
1055		return True
1056
1057	def is_topological_epsilon_machine( self, verbose=True ) :
1058
1059		"""
1060		Checks if the HMM is a topological $\\epsilon$-machine [^1].
1061
1062		[^1]: Johnson et al, Enumerating Finitary Processes, 2024.
1063			<https://arxiv.org/abs/1011.0036>
1064		"""
1065
1066		if not ( self.is_unifilar() and self.is_strongly_connected() ) :
1067			if verbose : 
1068				print( f"Either non unifilar or not strongly connected" )
1069			return False
1070		else :
1071			return self._is_minimal_as_dfa( topological_only=True, verbose=verbose )
1072
1073	def is_epsilon_machine( self, verbose=True ) :
1074
1075		if not ( self.is_unifilar() and self.is_strongly_connected() ) :
1076			if verbose : 
1077				print( f"Either non unifilar or not strongly connected" )
1078			return False
1079		else :
1080			return self._is_minimal_as_dfa( topological_only=False, verbose=verbose )
1081
1082	#-------------------------------------------------------#
1083	#                      Properties                       #
1084	#-------------------------------------------------------#
1085
1086	def minimize(self, retain_names: bool = True, verbose=False):
1087
1088		"""
1089		Minimizes the HMM, resulting in an :math:`\\epsilon-`machine if the HMM
1090		is unifilar and strongly connected. Converts the HMM to a DFA with symbols
1091		labeled jointly with symbols and probabilities, and uses Myhill-Nerode 
1092		equivalence for minimization. Relies on `automata_lib` and uses
1093		 `automata.fa.dfa.DFA.minify` with `allow_partial=True`, and all states
1094		 final.
1095
1096		Args:
1097			retain_names (bool): If `True`, the merged states will be named by their union, e.g. `{s_0, s_1}`, and other states will retain their origion names. Otherwise, they will be relabled `{ '0', '1', ..., 'n-1' }`.
1098
1099		Returns:
1100		
1101			automata.fa.dfa.DFA : the resulting DFA.
1102		"""
1103
1104		if self.is_minimal :
1105			return
1106
1107		start = time.perf_counter()
1108
1109		if not self.is_unifilar():
1110			raise ValueError(
1111				"DFA minimization is not valid for non-unifilar HMMs"
1112			)
1113
1114		was_strongly_connected = self.is_strongly_connected()
1115
1116		was_row_stochastic = self.is_row_stochastic()
1117		n_states_before = len(self.states)
1118
1119		dfa = self.as_dfa(with_probs=True)
1120
1121		#min_dfa = self.as_dfa(with_probs=True).minify(retain_names=True)
1122		min_dfa = am_fast.minify_cpp( dfa, retain_names=True )
1123
1124		# Build lookup from original state index -> CausalState object
1125		orig_state   = {i: s for i, s in enumerate(self.states)}
1126		eq_list      = list(min_dfa.states)
1127
1128		start_eq = min_dfa.initial_state
1129		
1130		# Separate the start state, then sort the rest by the 
1131		# smallest original state index inside each equivalence class.
1132		other_eqs = [eq for eq in eq_list if eq != start_eq]
1133		other_eqs.sort(key=lambda eq: min(eq))
1134
1135		# Recombine so start eq comes first, followed by the sorted remaining classes
1136		eq_list = [start_eq] + other_eqs
1137		# ----------------------------------------------------------
1138
1139		# Recompute eq_to_idx with the new ordering
1140		eq_to_idx = {eq: i for i, eq in enumerate(eq_list)}
1141
1142		# new_start is now guaranteed to be 0
1143		new_start = 0
1144
1145		# Map each original state index -> its equivalence class
1146		# Guard: minify() silently drops unreachable states
1147		orig_to_eq = {s: eq for eq in min_dfa.states for s in eq}
1148
1149		# Build lookup from original state index -> its transitions
1150		orig_trs = defaultdict(list)
1151		for t in self.transitions:
1152			orig_trs[t.origin_state_idx].append(t)
1153
1154		new_trs = []
1155		for eq in min_dfa.states:
1156			rep        = next(iter(eq))
1157			origin_idx = eq_to_idx[eq]
1158			for t in orig_trs[rep]:
1159				target_eq  = orig_to_eq[t.target_state_idx]
1160				target_idx = eq_to_idx[target_eq]
1161				new_trs.append(Transition(
1162					origin_state_idx = origin_idx,
1163					target_state_idx = target_idx,
1164					prob             = t.prob,
1165					symbol_idx       = t.symbol_idx,
1166				))
1167
1168		members_list = [[orig_state[i] for i in sorted(eq)] for eq in eq_list]  # sorted for determinism
1169
1170		# Compute new names
1171		if retain_names:
1172			new_names = [
1173				"{" + ",".join(str(m.name) for m in members) + "}" if len(members) > 1
1174				else members[0].name
1175				for members in members_list
1176			]
1177		else:
1178			new_names = [str(j) for j in range(len(eq_list))]
1179
1180		old_name_to_new_name = {
1181			m.name: new_names[j]
1182			for j, members in enumerate(members_list)
1183			for m in members
1184		}
1185
1186		# Build the new states, preserving classes and isomorphs regardless of naming
1187		new_states = []
1188		for j, (eq, members, name) in enumerate(zip(eq_list, members_list, new_names)):
1189			classes   = set().union(*(m.classes for m in members))
1190			isomorphs = {
1191				old_name_to_new_name.get(iso, iso)
1192				for m in members
1193				for iso in m.isomorphs
1194				if old_name_to_new_name.get(iso, iso) != name
1195			}
1196			new_states.append(CausalState(
1197				name      = name,
1198				classes   = classes,
1199				isomorphs = isomorphs,
1200			))
1201
1202		self.set_states(new_states)
1203		self.set_transitions(new_trs)
1204		self.start_state = new_start
1205
1206		if n_states_before == len(new_states) and verbose :
1207			print( f"{n_states_before} state HMM was already minimal.\n" )
1208		elif verbose :
1209			print( f"Minimized from {n_states_before} to {len(new_states)}\n" )
1210
1211		if not ( was_strongly_connected ==  self.is_strongly_connected() ) :
1212			raise RuntimeError(
1213				f"Minimization broke strongly connected"
1214			)
1215
1216		if not ( was_row_stochastic ==  self.is_row_stochastic() ) :
1217			raise RuntimeError(
1218				f"Minimization broke row stochasticity"
1219			)
1220
1221		self.is_minimal = True
1222
1223	#-------------------------------------------------------#
1224	#                      Modifiers                        #
1225	#-------------------------------------------------------#
1226
1227
1228	def collapse_to_largest_strongly_connected_subgraph( self, rename_states=True ) :
1229
1230		# get equivalent networkx graph
1231		G = self.as_digraph()
1232
1233		# if already strongly connected, nothing to do
1234		if not nx.is_strongly_connected( G ) :
1235
1236			start = time.perf_counter()
1237			subgraph_nodes = list( nx.strongly_connected_components( G ) )
1238
1239			# decompose into strongly connected components and sort by length
1240			# subgraph_nodes = list(nx.strongly_connected_components( G ))
1241			subgraph_nodes.sort(key=len)
1242			component_state_set = subgraph_nodes[-1]
1243
1244			# Take the largest strongly connected component (as list of state names)
1245			component_states = sorted( list( component_state_set ) )
1246
1247			# make temporary copies of the old transitions and states
1248			old_transitions = [
1249				Transition(
1250					origin_state_idx=tr.origin_state_idx,
1251					target_state_idx=tr.target_state_idx,
1252					prob=tr.prob,
1253					symbol_idx=tr.symbol_idx
1254				)
1255
1256				for tr in self.transitions
1257			]
1258
1259			old_states = [
1260				CausalState(
1261					name=s.name,
1262					classes=s.classes,
1263					isomorphs=s.isomorphs
1264				)
1265				for s in self.states
1266			]
1267
1268			self.set_states(
1269				states=[ 
1270					state
1271					for i, state in enumerate( old_states ) if i in component_state_set
1272				]
1273			)
1274
1275			# we will build new transition list based on those belonging to the component
1276			self.set_transitions( transitions= [] )
1277
1278			# for tracking which new transitions leave each state
1279			transitions_from_state = { state : set() for state in component_states }
1280			new_transitions = []
1281
1282			for tr in old_transitions :
1283
1284				origin_state_name = old_states[ tr.origin_state_idx ].name
1285				target_state_name = old_states[ tr.target_state_idx ].name
1286
1287				# skip transitions that connect separate strongly connected components
1288				if not ( tr.origin_state_idx in component_state_set and tr.target_state_idx in component_state_set ) :
1289					continue
1290
1291				# track transitions (by index in new transitions list) that leave this state
1292				transitions_from_state[ tr.origin_state_idx ].add( len( new_transitions ) )
1293
1294				my_origin_state_idx = self.state_idx_map[ origin_state_name ]
1295				my_target_state_idx = self.state_idx_map[ target_state_name ]
1296
1297				new_transitions.append( 
1298					Transition(
1299						origin_state_idx=my_origin_state_idx,
1300						target_state_idx=my_target_state_idx,
1301						prob=tr.prob,
1302						symbol_idx=tr.symbol_idx
1303					)  )
1304
1305			self.set_transitions( transitions=new_transitions )
1306
1307			# if we removed an outgoing transition from a state, we need to distribute its probability 
1308			# among the remaining outgoing transitions from the state
1309			for state in component_states :
1310				
1311				# get the set of transitions leaving this state
1312				state_trs = transitions_from_state[ state ]
1313
1314				# sum the probabilities of the outgoing transitions from the state
1315				p_sum = np.sum( [ self.transitions[ i ].prob for i in state_trs ] )
1316
1317				# how much probability is missing
1318				diff = 1.0 - p_sum
1319
1320				# if significant difference
1321				if abs( diff ) > self.EPS : 
1322
1323					# calculate how much of the difference each transition gets
1324					adjustment = diff / len( state_trs )
1325					
1326					# update the transitions
1327					for i in state_trs :
1328
1329						# transition with origional probability
1330						tr = self.transitions[ i ]
1331
1332						# adjusted probability
1333						self.transitions[ i ] = Transition(
1334							origin_state_idx=tr.origin_state_idx, 
1335							target_state_idx=tr.target_state_idx, 
1336							prob=tr.prob + adjustment, 
1337							symbol_idx=tr.symbol_idx )
1338
1339			if rename_states :
1340				self.set_states( [
1341					CausalState( name=f"{i}" )
1342					for i, s in enumerate( self.states )
1343				] )
1344
1345	def to_q_weighted( self, denominator_limit=1000 ) :
1346
1347		"""
1348		Approximates the existing transition probabilities with exact fractions, stores 
1349		the fractional probabilities as Fraction in Transition.pq, and sets the floating
1350		point probabilty to `float(pq)`. If `denominator_limit` is too small for a sane 
1351		conversion, the function recurses with `denominator_limit=denominator_limit*10`.
1352
1353		Args:
1354			denominator_limit (int): The initial input to :meth:`Fraction.limit_denominator` in 
1355			the conversion.
1356		"""
1357
1358		if self.is_q_weighted :
1359			return
1360
1361		if not self.is_row_stochastic() :
1362			raise ValueError( "Cannot convert to q-weighted because not row stochastic" )
1363
1364		t_from = [[] for _ in range(len(self.states))]
1365
1366		for i, tr in enumerate( self.transitions ) :
1367			t_from[ tr.origin_state_idx ].append( i )
1368
1369		new_transitions = []
1370		for t_list in t_from :
1371			
1372			if not t_list : 
1373				continue
1374
1375			p_q_sum = Fraction(0,1)
1376			p_qs = []
1377
1378			for t_idx in t_list :
1379			
1380				p_q = Fraction( self.transitions[ t_idx ].prob ).limit_denominator( denominator_limit )
1381				p_q_sum += p_q
1382				p_qs.append( p_q )
1383
1384			if p_q_sum != Fraction(1,1) :
1385				
1386				max_pq_i = np.argmax( p_qs )
1387				max_oq = p_qs[ max_pq_i ]
1388
1389				diff = p_q_sum - Fraction(1,1)
1390
1391				# If recurse with higher resolution
1392				if diff > max_oq :
1393					return self.to_q_weighted( denominator_limit*10 )
1394				else :
1395					p_qs[ max_pq_i ] -= diff
1396
1397			for i, t_idx in enumerate( t_list ) :	
1398				new_transitions.append( 
1399					Transition( 
1400						origin_state_idx=self.transitions[  t_idx ].origin_state_idx,
1401						target_state_idx=self.transitions[  t_idx ].target_state_idx,
1402						prob=float(p_qs[ i ]),
1403						symbol_idx=self.transitions[  t_idx ].symbol_idx,
1404						pq=p_qs[ i ]
1405					)
1406				)
1407
1408		self.set_transitions( new_transitions )
1409		self.is_q_weighted = True
1410
1411	#-------------------------------------------------------#
1412	#                    Data Generation                    #
1413	#-------------------------------------------------------#
1414
1415	def isomorphic_shift(
1416		self,
1417		input_symbol_indices: np.ndarray,
1418		input_state_indices:  np.ndarray,
1419		shift : int = 1
1420	) -> dict[str, np.ndarray]:
1421
1422		"""
1423		Generates a new sequence of of symbols that are permuted with the symbols emitted by
1424		isomorphic states, if they exists.
1425
1426		:math:`\\sigma_o = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i]\\right]`<br>
1427		:math:`\\sigma_t = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i+1]\\right]`
1428 
1429		:math:`\\mathcal{I}(\\sigma_o) = \\\\{\\sigma^0_o,\\, \\sigma^1_o,\\, \\dots,\\, \\sigma^{n-1}_o \\\\}`<br>
1430		:math:`\\mathcal{I}(\\sigma_t) = \\\\{\\sigma^0_t,\\, \\sigma^1_t,\\, \\dots,\\, \\sigma^{n-1}_t \\\\}`
1431 
1432		:math:`k = \\bigl(\\mathcal{I}(\\sigma_o).\\texttt{index}(\\sigma_o) + \\texttt{shift}\\bigr) \\bmod n`
1433 
1434		:math:`\\texttt{output\\_symbol\\_indices}[i]   := T(\\sigma_o^k,\\, \\sigma_t^k).\\text{symbol\\_index}`<br>
1435		:math:`\\texttt{output\\_state\\_indices}[i]    := \\mathcal{S}.\\texttt{index}(\\sigma_o^k)`<br>
1436		:math:`\\texttt{output\\_state\\_indices}[i+1]  := \\mathcal{S}.\\texttt{index}(\\sigma_t^k)`
1437
1438		Where :math:`\\mathcal{I}(\\sigma)`` is the ordered set of states isomorphic to :math:`\\sigma` including :math:`\\sigma` itself.
1439
1440		Args:
1441			input_symbol_indices (np.ndarray): The sequence of generated symbols.
1442			input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end.
1443			shift : int: How much to shift the symbols across the isomorphic states.
1444		"""
1445
1446		if not any( state.isomorphs for state in self.states ):
1447			raise ValueError("HMM has no states with isomorphs")
1448
1449		inputs = np.asarray(input_symbol_indices)
1450		states = np.asarray(input_state_indices)
1451
1452		n_states = len(self.states)
1453
1454		tr_sym_table = np.full((n_states, n_states), -1, dtype=np.int32)
1455
1456		for tr in self.transitions:
1457			tr_sym_table[ tr.origin_state_idx, tr.target_state_idx ] = tr.symbol_idx
1458
1459		# Build isomorph remapping: identity by default, overridden where isomorphs exist
1460		iso_table = np.arange(n_states, dtype=np.int32)
1461
1462		for i, state in enumerate(self.states):
1463			if state.isomorphs:
1464				isomorph       = sorted(state.isomorphs)[0]
1465				iso_table[ i ] = self.state_idx_map[isomorph]
1466
1467		for i, state in enumerate(self.states):
1468			if state.isomorphs:
1469
1470				# extend the isomorph list to include the identity
1471				isormorphs_with_identity = sorted( [ i ] + [ self.state_idx_map[iso] for iso in state.isomorphs ] )
1472
1473				# find the identity index
1474				pos = isormorphs_with_identity.index( i )
1475
1476				# cyclical shift 
1477				iso_table[ i ] = isormorphs_with_identity[ ( pos + shift ) % len( isormorphs_with_identity ) ]
1478
1479		origins = states[:-1]
1480		targets = states[1:]
1481
1482		out_origins = iso_table[origins]
1483		out_targets = iso_table[targets]
1484
1485		inv_sym = tr_sym_table[ out_origins, out_targets ]
1486		inv_sts = np.empty(states.size, dtype=states.dtype)
1487
1488		inv_sts[:-1] = out_origins
1489		inv_sts[-1]  = out_targets[-1]
1490
1491		return {
1492			"symbol_index": inv_sym.astype(inputs.dtype),
1493			"state_index":  inv_sts,
1494		}
1495
1496	def generate_data(
1497		self,
1498		file_prefix: str,
1499		n_gen: int,
1500		include_states: bool,
1501		isomorphic_shifts : set[int]=None,
1502		random_seed : int=42 ) -> dict[any] : 
1503
1504		trs = [ [] for _ in range( len( self.states ) ) ]
1505		for tr in self.transitions :
1506			trs[ tr.origin_state_idx ].append( ( 
1507				tr.symbol_idx, 
1508				float( tr.prob ),
1509				tr.target_state_idx ) )
1510
1511		data = am_fast.generate_data(
1512			n_gen=n_gen,
1513			start_state=self.start_state,
1514			transitions=trs,
1515			alphabet=sorted(list(self.alphabet)),
1516			include_states=include_states,
1517			random_seed=random_seed
1518		)
1519
1520		if isomorphic_shifts is not None :
1521
1522			if not include_states :
1523				raise ValueError( "Isomorphic inversion requires include_states=True" )
1524
1525			data[ "isomorphic_shifts" ] = {}
1526
1527			for shift in isomorphic_shifts :
1528
1529				try : 
1530
1531					shifted = self.isomorphic_shift(
1532						input_symbol_indices=data[ "symbol_index" ], 
1533						input_state_indices=data[ "state_index" ], 
1534						shift=shift
1535					)
1536
1537					data[ "isomorphic_shifts" ][ shift ] = {
1538						"symbol_index" : shifted[ "symbol_index" ],
1539						"state_index"  : shifted[ "state_index" ]
1540					}
1541
1542				except Exception as e :
1543					print( f"Exception {e}" )
1544
1545		am_fast.save_data(
1546			data=data,
1547			file_prefix=file_prefix,
1548			alphabet=sorted(list(self.alphabet)),
1549			n_states=len( self.states ),
1550			start_state=self.start_state,
1551			random_seed=random_seed,
1552			machine_metadata=self.get_metadata() )
1553
1554		return data
1555
1556	#-------------------------------------------------------#
1557	#                 Basic Visualization                   #
1558	#-------------------------------------------------------#
1559
1560	def draw_block_entropy_curve(self) :
1561		
1562		h_mu = self.h_mu()
1563		E = self.E()
1564
1565		if not "H_L" in self.complexity :
1566			C = self.block_convergence()
1567
1568		H_L = self.complexity[ "H_L" ]
1569		L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64')
1570		y = E + L*h_mu
1571
1572		plt.style.use('seaborn-v0_8-darkgrid') 
1573		fig, ax = plt.subplots(figsize=(10, 6), dpi=120)
1574		colors = plt.cm.tab10.colors 
1575
1576		ax.plot( L, H_L, color='#1E88E5', linewidth=2.0, linestyle='-', label=r'$H(L)$')
1577		ax.plot( L, y, color='#D81B60', linewidth=1.5, linestyle='--', label=r'$\mathbf{E} + h_{\mu}L$')
1578
1579		ax.fill_between(
1580			L, H_L, y, 
1581			color='gray', 
1582			alpha=0.5, 
1583			label=r'Transient information $\mathbf{T}$'
1584		)
1585
1586		ax.set_title('Block Entropy Curve', fontsize=16, fontweight='bold', pad=15)
1587		ax.set_xlabel('L', fontsize=12, labelpad=10)
1588		ax.set_ylabel('Bits', fontsize=12, labelpad=10)
1589
1590		legend = ax.legend(
1591			loc='upper left', 
1592			title_fontsize='11',
1593			fontsize='10',
1594			frameon=True,
1595			facecolor='white',
1596			shadow=True
1597		)
1598
1599		legend.get_title().set_fontweight('bold')
1600		plt.tight_layout() 
1601		plt.show()
1602
1603	def draw_block_measure_curves(self) :
1604		
1605		h_mu = self.h_mu()
1606		E = self.E()
1607
1608		if not "H_L" in self.complexity :
1609			C = self.block_convergence()
1610
1611		H_L = self.complexity[ "H_L" ]
1612		E_L = self.complexity[ "E_L" ]
1613		T_L = self.complexity[ "T_L" ]
1614		S_L = self.complexity[ "S_L" ]
1615		H_sync = self.complexity[ "H_sync" ][1:]
1616		h_mu_L = self.complexity[ "h_mu_L" ]
1617
1618		L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64')
1619		y = E + L*h_mu
1620
1621		plt.style.use('seaborn-v0_8-darkgrid') 
1622		fig, ax = plt.subplots(figsize=(10, 6), dpi=120)
1623		colors = plt.cm.tab10.colors 
1624
1625		ax.plot( L, H_L, color=colors[0], linewidth=2.0, linestyle='-', label=r'$H(L)$')
1626		ax.plot( L, E_L, color=colors[1], linewidth=2.0, linestyle='-', label=r'$\mathbf{E}(L)$')
1627		ax.plot( L, T_L, color=colors[2], linewidth=2.0, linestyle='-', label=r'$\mathbf{T}(L)$')
1628		ax.plot( L, S_L, color=colors[3], linewidth=2.0, linestyle='-', label=r'$\mathbf{S}(L)$')
1629		ax.plot( L, H_sync, color=colors[4], linewidth=2.0, linestyle='-', label=r'$\mathcal{H}(L)$')
1630		ax.plot( L, h_mu_L, color=colors[5], linewidth=2.0, linestyle='-', label=r'$h_{\mu}(L)$')
1631
1632		ax.set_title('Block Measures', fontsize=16, fontweight='bold', pad=15)
1633		ax.set_xlabel('L', fontsize=12, labelpad=10)
1634		ax.set_ylabel('Bits', fontsize=12, labelpad=10)
1635
1636		legend = ax.legend(
1637			loc='upper left', 
1638			title_fontsize='11',
1639			fontsize='10',
1640			frameon=True,
1641			facecolor='white',
1642			shadow=True
1643		)
1644
1645		legend.get_title().set_fontweight('bold')
1646		plt.tight_layout() 
1647		plt.show()
1648
1649	def draw_graph(
1650		self,
1651		engine     : str  = 'dot',
1652		output_dir : Path = Path('.'),
1653		show       : bool = True,
1654		symbols_only : bool = False
1655	) -> None :
1656
1657		"""
1658		Draws the machine using [pygraphiviz](https://pygraphviz.github.io/documentation/stable/) and saves it.
1659
1660		Returns:
1661		
1662			networkx.DiGraph : the resulting graph.
1663		"""
1664
1665		G = self.as_digraph()
1666
1667		subgraphs = None if nx.is_strongly_connected( G ) else list( nx.strongly_connected_components( G ) )
1668		
1669		am_vis.draw_graph( 
1670			self, 
1671			output_dir=output_dir, 
1672			title="am_graph", 
1673			view=show, 
1674			subgraphs=subgraphs, 
1675			engine=engine,
1676			symbols_only=symbols_only )
1677
1678	#-------------------------------------------------------#
1679	#                   Alternate Forms                     #
1680	#-------------------------------------------------------#
1681
1682
1683	def as_digraph( self ) -> nx.DiGraph :
1684
1685		"""
1686		Builds a [networkx.DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html) constructed from the machine's transitions with no edge symbols or weights.
1687
1688		Returns:
1689		
1690			networkx.DiGraph : the resulting graph.
1691		"""
1692
1693		G = nx.DiGraph()
1694		G.add_nodes_from( [ i for i, s in enumerate( self.states ) ] )
1695
1696		for tr in self.transitions :
1697			G.add_edge( tr.origin_state_idx, tr.target_state_idx )
1698
1699		return G
1700
1701	def as_dfa( self, with_probs : bool ) :
1702
1703		"""
1704		Builds an [automata.fa.dfa.DFA](https://caleb531.github.io/automata/api/fa/class-dfa/) constructed from the machine's transitions.
1705
1706		Args:
1707			with_probs (bool): If true the DFA transitions are labeled based on
1708				the symbol of the machines transition concatenated with its
1709				probability, othwise, the only the symbols.
1710
1711		Returns:
1712		
1713			automata.fa.dfa.DFA : the resulting DFA.
1714		"""
1715
1716		precision=8
1717
1718		def edge_label( symb, prob ) :
1719			return f"({symb},{round(prob, precision)})"
1720
1721		# Build states, symbols, and transitions 
1722		dfa_states  = { i for i, _ in enumerate( self.states ) }
1723			
1724		if not with_probs :
1725			dfa_symbols = set( { t.symbol_idx for t in self.transitions } )
1726		else : 
1727			dfa_symbols = set( { edge_label( t.symbol_idx, t.prob ) for t in self.transitions } )
1728
1729		dfa_transitions = defaultdict(dict)
1730
1731		if not with_probs :
1732			for t in self.transitions :
1733				dfa_transitions[ t.origin_state_idx ][ t.symbol_idx ] = t.target_state_idx
1734		else :
1735			for t in self.transitions :
1736				dfa_transitions[ t.origin_state_idx ][ edge_label( t.symbol_idx, t.prob ) ] = t.target_state_idx
1737
1738		# Construct the DFA
1739		return DFA(
1740			states=dfa_states,
1741			input_symbols=dfa_symbols,
1742			transitions=dfa_transitions,
1743			initial_state=self.start_state,
1744			allow_partial=True,
1745			final_states={ 
1746				s for s in dfa_states
1747			}
1748		)

Hidden Markov model implementing epsilon machines, mixed state presentations, complexity measures, and data generation.

Arguments:
  • states (list[CausalState] | None): A list of causal states.
  • transitions (list[Transition] | None): A list of transitions between states.
  • start_state (int): Index of the start state.
  • alphabet (list[str]): List of symbols making up the alphabet.
  • name (str): Name of the model.
  • description (str): Description of the model.
HMM( states: list[amachine.am_causal_state.CausalState] | None = None, transitions: list[amachine.am_transition.Transition] | None = None, start_state: int = 0, alphabet: list[str] | None = None, name: str = '', description: str = '')
54	def __init__( 
55		self,
56		states : list[CausalState] | None = None,
57		transitions : list[Transition] | None = None,
58		start_state : int = 0,
59		alphabet : list[str] | None = None,
60		name : str = "",
61		description : str = "" ) : 
62
63		self.alphabet    : list[str] | None = alphabet or []
64		self.states      : list[CausalState] | None= states or []
65		self.transitions : list[Transition] | None= transitions or []
66
67		self.set_alphabet( self.alphabet )
68		self.set_states( self.states )
69		self.set_transitions( self.transitions )
70
71		self.start_state : int = start_state
72
73		self.name : str = name
74		self.description : str = description
75
76		# To be depreciated
77		self.isoclass : str = None
78
79		# --- derived --------
80
81		self.complexity : dict[str, any] = {}
82
83		self.symbol_idx_map : dict[str,int] = {}
84		self.state_idx_map  : dict[str,int] = {}
85
86		self.pi_fractional = None
87		self.pi : np.ndarray | None = None
88		self.T  : np.ndarray | None = None
89		self.msp : MSP | None = None
90		self.reverse_am : HMM | None = None
91		self.is_q_weighted : bool = False
92		self.is_minimal : bool = False
93
94		# --- const ----------
95
96		self.EPS : float = 1e-12
alphabet: list[str] | None
transitions: list[amachine.am_transition.Transition] | None
start_state: int
name: str
description: str
isoclass: str
complexity: dict[str, any]
symbol_idx_map: dict[str, int]
state_idx_map: dict[str, int]
pi_fractional
pi: numpy.ndarray | None
T: numpy.ndarray | None
msp: amachine.am_msp.MSP | None
reverse_am: HMM | None
is_q_weighted: bool
is_minimal: bool
EPS: float
@override
def get_states(self) -> list[amachine.am_causal_state.CausalState]:
102	@override
103	def get_states(self) -> list[CausalState] : 
104		return self.states
@override
def get_transitions(self) -> list[amachine.am_transition.Transition]:
106	@override
107	def get_transitions(self) -> list[Transition] : 
108		return self.transitions
@override
def get_alphabet(self) -> list[amachine.am_symbol.Symbol]:
110	@override
111	def get_alphabet(self) -> list[Symbol]:
112		return [ Symbol(a) for a in self.alphabet ]
def to_dict(self) -> dict[str, any]:
119	def to_dict(self) -> dict[str,any]:
120
121		"""Create a dict representing the HMM configuration.
122
123		Returns:
124
125			dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass.
126		"""
127
128		return {
129			"name"            : self.name,
130			"description"     : self.description,
131			"states"          : [ asdict(state)      for state      in self.states      ],
132			"transitions"     : [ asdict(transition) for transition in self.transitions ],
133			"alphabet"        : self.alphabet,
134			"isoclass"        : self.isoclass
135		}

Create a dict representing the HMM configuration.

Returns:

dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass.

def from_dict(self, config: dict[str, any]):
137	def from_dict( self, config : dict[str,any] ) :
138
139		"""Configure the HMM configuration from a dictionary.
140
141		Args:
142			config (dict[str,any]): The HMM configuration.
143		"""
144
145		self.name            = config.name 
146		self.description     = config.description 
147		
148		self.set_states( states=[ 
149			CausalState( 
150				name=state[ "name" ],
151				classes=set( state[ "classes" ] )
152			)
153			for state in config.states
154		] )
155
156		self.set_transitions( transitions=[ 
157			Transition( 
158				origin_state_idx=tr[ "origin_state_idx" ],
159				target_state_idx=tr[ "target_state_idx" ],
160				prob=tr[ "prob" ],
161				symbol_idx=tr[ "symbol_idx" ]
162			)
163			for tr in config.transitions
164		] )
165
166		self.set_alphabet( alphabet=config.alphabet ) 

Configure the HMM configuration from a dictionary.

Arguments:
  • config (dict[str,any]): The HMM configuration.
def save_config( self, path: pathlib.Path, with_complexity: bool = False, with_non_trivial_complexity: bool = False):
168	def save_config(
169		self, 
170		path : Path, 
171		with_complexity : bool = False, 
172		with_non_trivial_complexity : bool = False ) :
173
174		config = self.to_dict()
175
176		if with_complexity :
177			
178			complexity = self.get_complexities( 
179				with_non_trivial=with_non_trivial_complexity
180			)
181
182			config[ "complexity" ] = complexity
183
184		config[ "structural_properties" ] = {
185			"unifilar"              : self.is_unifilar(),
186			"row_stochastic"        : self.is_row_stochastic(),
187			"strongly_connected"    : self.is_strongly_connected(),
188			"aperiodic"             : self.is_aperiodic(),
189			"minimal"               : self._is_minimal_as_dfa( topological_only=False ),
190			"topologically_minimal" : self._is_minimal_as_dfa( topological_only=True ),
191			"is_epsilon_machine"    : self.is_epsilon_machine()
192		}
193
194		with open( path / "am_config.json", "w", encoding="utf-8" ) as f :
195			json.dump( config, f, ensure_ascii=False, indent=2, default=list )
def from_file(self, path: pathlib.Path):
197	def from_file( self, path : Path ) :
198		with open( Path / "am_config.json", "r" ) as f:
199			config = json.load(f)
200		self.from_dict()
def clear_cache(self):
206	def clear_cache(self) :
207
208		"""
209		Reset all derived properties so they will be recomputed when requested later.
210		"""
211
212		self.complexity = {}
213		self.T = None
214		self.T_x = None
215		self.pi = None
216		self.pi_fractional = None
217		self.msp = None 
218		self.reverse_am = None
219		self.is_q_weighted = False
220		self.is_minimal = False

Reset all derived properties so they will be recomputed when requested later.

def set_states(self, states: list[amachine.am_causal_state.CausalState]):
222	def set_states( self, states : list[CausalState] ) :
223		self.clear_cache()
224		self.states = states.copy()
225		self.state_idx_map = {}
226		for idx, state in enumerate( self.states ) :
227			self.state_idx_map[ state.name ] = idx
def set_alphabet(self, alphabet: list[str]):
229	def set_alphabet( self, alphabet : list[str] ) :
230
231		self.clear_cache()
232
233		old_alphabet = self.alphabet.copy()
234
235		aSet = set()
236		aSet.update( alphabet )
237
238		self.alphabet = sorted(list(aSet))
239
240		self.symbol_idx_map = {}
241		for idx, symbol in enumerate( self.alphabet ) :
242			self.symbol_idx_map[ symbol ] = idx
243
244		for i, tr in enumerate( self.transitions ) :
245			symbol = old_alphabet[ tr.symbol_idx ]
246			self.transitions[ i ] = Transition(
247				origin_state_idx=tr.origin_state_idx,
248				target_state_idx=tr.target_state_idx,
249				prob=tr.prob,
250				symbol_idx=self.symbol_idx_map[ symbol ]
251			)
def set_transitions(self, transitions: list[amachine.am_transition.Transition]):
253	def set_transitions( self, transitions : list[Transition] ) :
254		self.clear_cache()
255		self.transitions = transitions.copy()
def extend_states(self, states: list[amachine.am_causal_state.CausalState]):
257	def extend_states( self, states : list[CausalState] ) :
258		self.set_states( self.states + states )
def extend_alphabet(self, alphabet: list[str]):
260	def extend_alphabet( self, alphabet : list[str] ) :
261		self.set_alphabet( self.alphabet + alphabet  )
def extend_transitions(self, transitions: list[amachine.am_transition.Transition]):
263	def extend_transitions( self, transitions : list[Transition] ) :
264		self.set_transitions( self.transitions + transitions  )
def get_complexity_measure_if_exists(self, measure):
266	def get_complexity_measure_if_exists(self, measure ) :
267		m = self.complexity.get( measure, None )
268		return m
def set_complexity_measure(self, measure, value):
270	def set_complexity_measure(self, measure, value ) :
271		self.complexity[ measure ] = value
def get_complexities(self, with_non_trivial=False):
277	def get_complexities( 
278		self, 
279		with_non_trivial=False ) :
280
281		trivial = [
282			self.C_mu,
283			self.h_mu,
284			self.H_1,
285			self.rho_mu
286		]
287
288		non_trivial = [
289			self.E, 
290			self.T_inf,
291			self.S,
292			self.chi
293		]
294			
295		complexities = { m.__name__ : m() for m in trivial }
296
297		if with_non_trivial :
298
299			complexities |= { m.__name__ : m() for m in non_trivial }
300
301			for key in [ 'H_L', 'h_mu_L', 'H_sync' ] :
302				if key in self.complexity :
303					complexities[ key ] = self.complexity[ key ]
304
305		return complexities
def get_metadata(self):
309	def get_metadata(self) :
310		return {
311			"name" : self.name,
312			'complexity' : self.complexity,
313			"description" : self.description
314		}
def get_transition_matrix(self):
316	def get_transition_matrix(self) :
317
318		if self.T  is not None :
319			return self.T
320
321		n_states = len( self.states )
322		T = np.zeros((n_states, n_states))
323
324		for tr in self.transitions :    
325			T[ tr.origin_state_idx, tr.target_state_idx  ] = tr.prob
326
327		self.T = T
328
329		return self.T
def get_T_X(self):
333	def get_T_X(self) :
334
335		if self.T_x  is not None :
336			return self.T_x
337
338		n_states  = len( self.states )
339		n_symbols = len( self.alphabet )
340
341		T_x = [ np.zeros((n_states, n_states)) for _ in range( n_symbols ) ]
342
343		for tr in self.transitions :
344			T_x[ tr.symbol_idx ][tr.origin_state_idx, tr.target_state_idx] = tr.prob
345
346		self.T_x = T_x
347		return self.T_x
def get_msp_qw(self, exact_state_cap: int = 1000, verbose: bool = True):
351	def get_msp_qw(
352		self,
353		exact_state_cap: int = 1000,
354		verbose: bool = True,
355	):
356		if self.msp is not None:
357			return self.msp
358
359		try : 
360
361			print( "\nTrying to Compute Mixed State Presentation using Exact Fractions\n" )
362
363			self.msp = compute_msp_exact(
364				T_x=self.get_Tx_fractional(),
365				pi=self.get_fractional_stationary_distribution(),
366				n_states=len(self.states),
367				alphabet=self.alphabet,
368				exact_state_cap=1000,
369				bool = True
370			)
371
372			return self.msp 
373
374		except RuntimeError as e :
375			warnings.warn( f"Exact msp failed: {e} Falling back to msp approximation." )
376
377		return self.get_msp()
def get_msp( self, exact_state_cap: int = 175000, jsd_eps: float = 1e-07, k_ann: int = 50, verbose=True):
379	def get_msp(
380		self,
381		exact_state_cap: int = 175_000,
382		jsd_eps:         float = 1e-7,
383		k_ann:           int   = 50,
384		verbose                = True,
385	) :
386
387		if self.msp is not None:
388			return self.msp
389	 
390		T_x = self.get_T_X()
391		pi  = self.get_stationary_distribution()
392
393		T_stacked      = np.stack(T_x)
394		n_symbols      = len(self.alphabet)
395		n_input_states = T_stacked.shape[1]
396	
397		print( "\nComputing Mixed State Presentation..." )
398
399		self.msp = compute_msp( 
400			T_x=T_x,
401			pi=pi,
402			n_states=len(self.states),
403			alphabet=self.alphabet,
404			exact_state_cap=exact_state_cap,
405			verbose=verbose
406		)
407
408		return self.msp
def get_reverse_am(self):
410	def get_reverse_am(self) :
411
412		if self.reverse_am is not None:
413			return self.reverse_am
414
415		pi = self.get_stationary_distribution()
416		self.reverse_am = copy.deepcopy(self)
417		
418		new_transitions = []
419		for tr in self.transitions:
420			i = tr.target_state_idx
421			j = tr.origin_state_idx
422			
423			p_reversed = (pi[j] * tr.prob) / pi[i]
424			
425			new_transitions.append(
426				Transition(
427					origin_state_idx=i,
428					target_state_idx=j,
429					prob=p_reversed,
430					symbol_idx=tr.symbol_idx
431				)
432			)
433
434		self.reverse_am.set_transitions(new_transitions)
435
436		if self.reverse_am.is_epsilon_machine():
437			return self.reverse_am
438
439		rmsp = self.reverse_am.get_msp_qw( exact_state_cap=len(self.states)*4 )
440
441		self.reverse_am.set_states( rmsp.states )
442		self.reverse_am.set_transitions( rmsp.transitions )
443		self.reverse_am.msp = rmsp
444		self.reverse_am.start_state = 0
445
446		self.reverse_am.collapse_to_largest_strongly_connected_subgraph()
447		self.reverse_am.minimize()
448
449		return self.reverse_am
def get_Tx_fractional(self) -> list[list[list[fractions.Fraction]]]:
453	def get_Tx_fractional(self) -> list[ list[ list[ Fraction ] ] ] :
454
455		self.to_q_weighted()
456
457		n_states  = len( self.states )
458		n_symbols = len( self.alphabet )
459
460		T_x = []
461
462		for x in range( n_symbols ) :
463			T_x.append( [] )
464			for i in range( n_states ) :
465				T_x[ x ].append( [ 0 for _ in range( n_states ) ] )
466
467		for tr in self.transitions :
468			T_x[ tr.symbol_idx ][ tr.origin_state_idx ][ tr.target_state_idx ] = tr.pq
469
470		return T_x
def get_T_sympy(self):
472	def get_T_sympy( self ) :
473
474		self.to_q_weighted()
475
476		n = len( self.states )
477		T = sympy.zeros( n, n )
478
479		for tr in self.transitions :
480			T[ tr.origin_state_idx, tr.target_state_idx ] = tr.pq
481
482		return T
def get_fractional_stationary_distribution(self):
484	def get_fractional_stationary_distribution(self) :
485
486		T = self.get_T_sympy()
487
488		if self.pi_fractional is not None :
489			return self.pi_fractional
490
491		G = self.as_digraph()
492
493		if not nx.is_strongly_connected(G):
494			raise ValueError( "Single stationary distribution requires strongly connected HMM." )
495
496		self.pi_fractional = solve_for_pi_fractional( T )
497
498		return self.pi_fractional
def get_stationary_distribution(self):
500	def get_stationary_distribution(self):
501
502		if self.pi is not None :
503			return self.pi
504
505		G = self.as_digraph()
506		
507		if not nx.is_strongly_connected(G):
508			raise ValueError( "Single stationary distribution requires strongly connected HMM." )
509
510		T = self.get_transition_matrix()
511		return solve_for_pi( T )		
def C_mu(self):
517	def C_mu( self ) :
518
519		"""The *statistical complexity* (aka *forecasting complexity*) :
520
521		.. math::
522
523			C_{\\mu} = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\log_2 \\Pr(\\sigma),
524
525		where :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2.
526
527		.. note::
528
529			**Interpretations**
530
531			* The amount of historical information a process stores.
532			* The amount of structure in a process.
533
534		Returns:
535
536			float: :math:`C_{\\mu}`.
537
538		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
539			Decomposition of Intrinsic Computation*, 2016.
540			<https://arxiv.org/abs/1309.3792>
541		"""
542
543		m = self.get_complexity_measure_if_exists( "C_mu" )
544
545		if m is not None :
546			return m
547
548		pi = self.get_stationary_distribution()
549
550		h = 0
551		for i, pr in enumerate( pi ) :
552			
553			if pr < self.EPS :
554				continue
555
556			h += -pr * np.log2( pr )
557
558		self.set_complexity_measure( "C_mu", h )
559
560		return h

The statistical complexity (aka forecasting complexity) :

$$C_{\mu} = - \sum_{\sigma \in \mathcal{S}} \Pr(\sigma) \log_2 \Pr(\sigma),$$

where \( \mathcal{S} \) is the set of states 1, p.2.

Interpretations

  • The amount of historical information a process stores.
  • The amount of structure in a process.
Returns:

float: \( C_{\mu} \).


  1. Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 

def h_mu(self):
564	def h_mu( self ) :
565
566		"""The *entropy rate* :
567
568		.. math::
569
570			h_{\\mu}(\\boldsymbol{\\mathcal{S}}) = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\sum_{x \\in \\mathcal{A}} \\Pr(x|\\sigma) \\log_2 \\Pr(x|\\sigma),
571
572		where :math:`\\mathcal{A}` is the alphabet and :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2.
573
574		.. note::
575
576			**Interpretations**
577
578			* The lower bound on achievable loss in bits. 
579			* The irreducable randomness in the process.
580			* The intrinsic Randomness in the process.
581
582		Returns:
583			
584			float: :math:`h_{\\mu}`.
585
586		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
587			Decomposition of Intrinsic Computation*, 2016.
588			<https://arxiv.org/abs/1309.3792>
589		"""
590
591		m = self.get_complexity_measure_if_exists( "h_mu" )
592
593		if m is not None :
594			return m
595
596		T  = self.get_transition_matrix()
597		pi = self.get_stationary_distribution()
598
599		n_states = pi.size
600
601		h = 0
602		for i, pr in enumerate( pi ) :
603
604			if pr < self.EPS :
605				continue
606
607			row_entropy = 0
608			for j in range( len( pi ) ) :
609
610				if T[ i, j ]  < self.EPS :
611					continue
612
613				row_entropy -= T[ i, j ] * np.log2( T[ i, j ] )
614
615			h += pr * row_entropy
616
617		self.set_complexity_measure( "h_mu", h )
618
619		return h

The entropy rate :

$$h_{\mu}(\boldsymbol{\mathcal{S}}) = - \sum_{\sigma \in \mathcal{S}} \Pr(\sigma) \sum_{x \in \mathcal{A}} \Pr(x|\sigma) \log_2 \Pr(x|\sigma),$$

where \( \mathcal{A} \) is the alphabet and \( \mathcal{S} \) is the set of states 1, p.2.

Interpretations

  • The lower bound on achievable loss in bits.
  • The irreducable randomness in the process.
  • The intrinsic Randomness in the process.
Returns:

float: \( h_{\mu} \).


  1. Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 

def H_1(self) -> float:
623	def H_1(self) -> float :
624
625		"""The *single symbol uncertainty*:
626
627		.. math::
628
629			H(1)=-\\sum_{x\\in\\mathcal{A}} \\Pr(x) \\log_2{\\Pr(x)},
630
631		where :math:`\\mathcal{A}` is the alphabet [^James_2018], p.2.
632
633		.. note::
634
635			**Interpretations**
636
637			* How uncertain you are on average about a single measurement with no context.
638
639		Returns:
640
641			float: :math:`H(1)`.
642
643		[^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018.
644			<https://arxiv.org/abs/1105.2988>
645		"""
646
647		m = self.get_complexity_measure_if_exists("H_1")
648		if m is not None:
649			return m
650
651		pi  = self.get_stationary_distribution()
652		T_X = self.get_T_X()  # dict: symbol -> matrix
653
654		h = 0.0
655		for T_x in T_X:
656			# Pr(x) = sum_i pi[i] * sum_j T^(x)[i,j]
657			p_sym = 0.0
658			for i, pr in enumerate(pi):
659				if pr < self.EPS:
660					continue
661				p_sym += pr * T_x[i, :].sum()
662
663			if p_sym < self.EPS:
664				continue
665			h -= p_sym * np.log2(p_sym)
666
667		self.set_complexity_measure("H_1", h)
668		return h

The single symbol uncertainty:

$$H(1)=-\sum_{x\in\mathcal{A}} \Pr(x) \log_2{\Pr(x)},$$

where \( \mathcal{A} \) is the alphabet 1, p.2.

Interpretations

  • How uncertain you are on average about a single measurement with no context.
Returns:

float: \( H(1) \).


  1. James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. https://arxiv.org/abs/1105.2988 

def rho_mu(self) -> float:
672	def rho_mu(self) -> float :
673		
674		"""The *anticipated information* [^James_2018], p.3.:
675
676		.. math::
677
678			\\rho_{\\mu}= H(1) - h_{\\mu}
679
680		Returns:
681			
682			float: :math:`\\rho_{\\mu}`
683
684		[^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018.
685			<https://arxiv.org/abs/1105.2988>
686		"""
687
688		m = self.get_complexity_measure_if_exists("rho_mu")
689		
690		if m is not None:
691			return m
692
693		rho = self.H_1() - self.h_mu()
694		
695		self.set_complexity_measure("rho_mu", rho)
696		
697		return rho

The anticipated information 1, p.3.:

$$\rho_{\mu}= H(1) - h_{\mu}$$

Returns:

float: \( \rho_{\mu} \)


  1. James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. https://arxiv.org/abs/1105.2988 

def block_convergence(self):
701	def block_convergence( self )  :
702
703		"""
704		Run [block entropy convergence](am_fast.html#block_entropy_convergence). Estimates [$\\mathbf{E}$](am_hmm.html#HMM.E), [$\\mathbf{S}$](am_hmm.html#HMM.S), [$\\mathbf{T}$](am_hmm.html#HMM.T_inf), and block measures[^crutchfield_exact_2016]:
705
706		- $\\mathbf{E}(L) = H(L) - L \\cdot h_{\\mu}$, 
707
708		- $\\mathbf{T}(L) = \\sum_{l=1}^{L} l \\left[ h_{\\mu}(l) - h_{\\mu} \\right]$, 
709
710		- $\\mathbf{S}(L) = \\sum_{l=0}^{L} \\mathcal{H}(l)$, 
711
712		- $H(L) = H[X_{0:L}]$, 
713
714		- $h_{\\mu}(L) = H(L) - H(L-1)$, and 
715
716		- $\\mathcal{H}(L) = -\\sum_{w \\in \\mathcal{A}^L} Pr(w) \\sum_{\\sigma \\in \\mathcal{S}} Pr(\\sigma|w) \\log_2 Pr(\\sigma|w)$.
717
718		You can plot these curves, and the block entropy curves using, [`amachine.HMM.draw_block_measure_curves`](am_hmm.html#HMM.draw_block_measure_curves), and [`amachine.HMM.draw_block_entropy_curve`](am_hmm.html#HMM.draw_block_entropy_curve). 
719
720		<img src="../resources/curves.png" alt="block measures plots" style="width: 100%; margin-left: 0%;">
721
722		Returns:
723		
724			ComplexityMeasures: An object containing the estimated measures with the following attributes:
725			
726			- E (float): The excess entropy.
727			- T_inf (float): The transient information ($\\mathbf{T}$).
728			- S (float): The synchronization information.
729			- E_L (numpy.ndarray): The block excess entropy ($\\mathbf{E}(L)$).
730			- T_L (numpy.ndarray): The block transient information ($\\mathbf{T}(L)$).
731			- S_L (numpy.ndarray): The block synchronization information ($\\mathbf{S}(L)$).
732			- H_L (numpy.ndarray): The block entropy ($H(L)$).
733			- h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\\mu}(L)$).
734			- H_sync (numpy.ndarray): The state-block synchronization ($\\mathcal{H}(L)$).
735			- converged (bool): True if the algorithm converged.
736
737		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
738			Decomposition of Intrinsic Computation*, 2016.
739			<https://arxiv.org/abs/1309.3792>
740		"""
741
742		trs = [ [] for _ in range( len( self.states ) ) ]
743		for tr in self.transitions :
744			trs[ tr.origin_state_idx ].append( ( 
745				tr.symbol_idx, 
746				float( tr.prob ),
747				tr.target_state_idx ) )
748
749		pi = self.get_stationary_distribution()
750
751		state_dist = [ float( pi[ i ] ) for i in range( len( self.states ) ) ]
752		branches = [(1.0, list(state_dist))]
753
754		print( "\nComputing Block Entropy\n" )
755
756		C = am_fast.block_entropy_convergence(
757			h_mu            = self.h_mu(),
758			n_states        = len( self.states ),
759			n_symbols       = len( self.alphabet ),
760			convergence_tol = 1e-6,
761			precision       = 10,
762			eps             = 1e-25,
763			branches        = branches,
764			trans           = trs,
765			max_branches    = 30_000_000
766		)
767
768		print( "Done\n" )
769
770		self.set_complexity_measure( f"E",       C.E )
771		self.set_complexity_measure( f"S",       C.S )
772		self.set_complexity_measure( f"T_inf",   C.T )
773		self.set_complexity_measure( f"E_L",     C.E_L.tolist() )
774		self.set_complexity_measure( f"T_L",     C.T_L.tolist() )
775		self.set_complexity_measure( f"S_L",     C.S_L.tolist() )
776		self.set_complexity_measure( f"H_L",     C.H_L.tolist() )
777		self.set_complexity_measure( f"h_mu_L",  C.h_mu_L.tolist() )
778		self.set_complexity_measure( f"H_sync",  C.H_sync.tolist() )
779
780		return C

Run block entropy convergence. Estimates $\mathbf{E}$, $\mathbf{S}$, $\mathbf{T}$, and block measures1:

  • $\mathbf{E}(L) = H(L) - L \cdot h_{\mu}$,

  • $\mathbf{T}(L) = \sum_{l=1}^{L} l \left[ h_{\mu}(l) - h_{\mu} \right]$,

  • $\mathbf{S}(L) = \sum_{l=0}^{L} \mathcal{H}(l)$,

  • $H(L) = H[X_{0:L}]$,

  • $h_{\mu}(L) = H(L) - H(L-1)$, and

  • $\mathcal{H}(L) = -\sum_{w \in \mathcal{A}^L} Pr(w) \sum_{\sigma \in \mathcal{S}} Pr(\sigma|w) \log_2 Pr(\sigma|w)$.

You can plot these curves, and the block entropy curves using, amachine.HMM.draw_block_measure_curves, and amachine.HMM.draw_block_entropy_curve.

block measures plots

Returns:

ComplexityMeasures: An object containing the estimated measures with the following attributes:

  • E (float): The excess entropy.
  • T_inf (float): The transient information ($\mathbf{T}$).
  • S (float): The synchronization information.
  • E_L (numpy.ndarray): The block excess entropy ($\mathbf{E}(L)$).
  • T_L (numpy.ndarray): The block transient information ($\mathbf{T}(L)$).
  • S_L (numpy.ndarray): The block synchronization information ($\mathbf{S}(L)$).
  • H_L (numpy.ndarray): The block entropy ($H(L)$).
  • h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\mu}(L)$).
  • H_sync (numpy.ndarray): The state-block synchronization ($\mathcal{H}(L)$).
  • converged (bool): True if the algorithm converged.

  1. Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 

def E(self) -> float:
784	def E( self ) -> float :
785
786		"""The *excess entropy* [^crutchfield_exact_2016], p.4:
787
788		.. math::
789
790			\\mathbf{E} \\equiv \\sum_{L=1}^{\\infty} I[X_{-\\infty:0}; X_{0:\\infty}]
791		
792		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
793
794		.. note::
795
796			**Interpretations**
797
798			* The information from the past that reduces uncertainty in the future [^crutchfield_exact_2016].
799			* How much information an observer must extract to synchronize to the process.
800			* Measures how long the process appears more complex than it asymptotically is.
801			* Vanishes for immediately synchronizable processes.
802
803		Returns:
804		
805			float: :math:`\\mathbf{E}`
806
807		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
808			Decomposition of Intrinsic Computation*, 2016.
809			<https://arxiv.org/abs/1309.3792>
810		"""
811
812		m = self.get_complexity_measure_if_exists( "E" )
813
814		if m is not None :
815			return m
816
817		try : 
818			msp = self.get_msp()
819			E, S, T = msp.get_E_S_T()
820			self.set_complexity_measure( "E", E )
821			self.set_complexity_measure( "S", S )
822			self.set_complexity_measure( "T_inf", T )
823			
824		except Exception as e :
825
826			print( f"MSP failed {e}" )
827
828			C = self.block_convergence()	
829			E = C.E
830			self.set_complexity_measure( "E", E )
831
832		return E

The excess entropy 1, p.4:

$$\mathbf{E} \equiv \sum_{L=1}^{\infty} I[X_{-\infty:0}; X_{0:\infty}]$$

Computed via get_msp() and amachine.am_msp.MSP.get_E_S_T(), or amachine.am_fast.block_entropy_convergence()

Interpretations

  • The information from the past that reduces uncertainty in the future 1.
  • How much information an observer must extract to synchronize to the process.
  • Measures how long the process appears more complex than it asymptotically is.
  • Vanishes for immediately synchronizable processes.
Returns:

float: \( \mathbf{E} \)


  1. Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 

def S(self) -> float:
836	def S( self ) -> float :
837
838		"""The *synchronization* information:
839
840		.. math::
841
842			\\mathbf{S} \\equiv \\sum_{L=1}^{\\infty} \\mathcal{H}(L),
843
844		where :math:`\\mathcal{H}(L)` is the average state uncertainty having seen all length-L words [^crutchfield_exact_2016], p.4.
845
846		.. note::
847
848			**Interpretations**
849
850			* The total amount of state information that an observer must extract to become synchronized [^crutchfield_exact_2016].
851
852		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
853
854		Returns:
855		
856			float: :math:`\\mathbf{S}`
857
858		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
859			Decomposition of Intrinsic Computation*, 2016.
860			<https://arxiv.org/abs/1309.3792>
861		"""
862
863		m = self.get_complexity_measure_if_exists( "S" )
864
865		if m is not None :
866			return m
867
868		try : 
869			msp = self.get_msp()
870			E, S, T = msp.get_E_S_T()
871			self.set_complexity_measure( "E", E )
872			self.set_complexity_measure( "S", S )
873			self.set_complexity_measure( "T_inf", T )
874
875		except Exception as e :
876			print( f"{e} \nFalling back to iterative estimation.")
877			exit()
878
879			C = self.block_convergence()	
880			S = C.S
881			self.set_complexity_measure( "S", S )
882
883		return S

The synchronization information:

$$\mathbf{S} \equiv \sum_{L=1}^{\infty} \mathcal{H}(L),$$

where \( \mathcal{H}(L) \) is the average state uncertainty having seen all length-L words 1, p.4.

Interpretations

  • The total amount of state information that an observer must extract to become synchronized 1.

Computed via get_msp() and amachine.am_msp.MSP.get_E_S_T(), or amachine.am_fast.block_entropy_convergence()

Returns:

float: \( \mathbf{S} \)


  1. Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 

def T_inf(self) -> float:
887	def T_inf( self ) -> float :
888
889		"""The *transient information*[^crutchfield_exact_2016], p.4:
890
891		.. math::
892
893			\\mathbf{T} \\equiv \\sum_{L=1}^{\\infty} L \\left[ h_{\\mu}(L) - h_{\\mu} \\right]
894
895		Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence`
896
897		.. note::
898
899			**Interpretations**
900
901			* The amount of information one must extract from observations so that the block entropy converges to its linear asymptote[^crutchfield_exact_2016].
902
903		Returns:
904		
905			float: :math:`\\mathbf{T}`
906
907		[^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral
908			Decomposition of Intrinsic Computation*, 2016.
909			<https://arxiv.org/abs/1309.3792>
910		"""
911
912		m = self.get_complexity_measure_if_exists( "T_inf" )
913
914		if m is not None :
915			return m
916
917		try : 
918			msp = self.get_msp()
919			E, S, T = msp.get_E_S_T()
920			self.set_complexity_measure( "E", E )
921			self.set_complexity_measure( "S", S )
922			self.set_complexity_measure( "T_inf", T )
923
924		except Exception as e :
925			print( f"{e} \nFalling back to iterative estimation.")
926			C = self.block_convergence()	
927			T_inf = C.T
928			self.set_complexity_measure( "T_inf", T_inf )
929
930		return T_inf

The transient information1, p.4:

$$\mathbf{T} \equiv \sum_{L=1}^{\infty} L \left[ h_{\mu}(L) - h_{\mu} \right]$$

Computed via get_msp() and amachine.am_msp.MSP.get_E_S_T(), or amachine.am_fast.block_entropy_convergence()

Interpretations

  • The amount of information one must extract from observations so that the block entropy converges to its linear asymptote1.
Returns:

float: \( \mathbf{T} \)


  1. Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 

def chi(self) -> float:
934	def chi( self ) -> float :
935
936		"""The foward crypticity[^crutchfield_crypticity_2009][^Mahoney_crypticity_2021], p.2:
937
938		.. math::
939
940			\\chi = C_{\\mu} - \\mathbf{E}
941
942		:math:`C_{\\mu}` is trivially computed from the stationary distribution in :meth:`C_mu` and :math:`\\mathbf{E}` in :meth:`E`.
943
944		.. note::
945
946			**Interpretations**
947
948			* Difference between internal stored information and apparent information to an observer.
949			* How muching information is hiding in the system.
950
951		Returns:
952		
953			float: :math:`\\chi`
954
955		[^crutchfield_crypticity_2009]: Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009.
956			<https://arxiv.org/abs/0902.1209>
957
958		[^Mahoney_crypticity_2021]: Mahoney et al., Information Accessibility and Cryptic Processes, 2021.
959			<https://arxiv.org/abs/0905.4787>
960		"""
961
962		m = self.get_complexity_measure_if_exists( "chi" )
963
964		if m is not None :
965			return m
966
967		chi = self.C_mu() - self.E()
968
969		if chi < 0 :
970			
971			# if chi is 0, accumulated floating point error can result in small negative values
972			if chi < -1e-5:
973				warnings.warn(f"Crypticity is negative ({chi:.6e}).")
974			
975			chi = np.clamp( chi, 0 )
976
977		self.set_complexity_measure( "chi", chi )
978
979		return chi

The foward crypticity12, p.2:

$$\chi = C_{\mu} - \mathbf{E}$$

\( C_{\mu} \) is trivially computed from the stationary distribution in C_mu() and \( \mathbf{E} \) in E().

Interpretations

  • Difference between internal stored information and apparent information to an observer.
  • How muching information is hiding in the system.
Returns:

float: \( \chi \)


  1. Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009. https://arxiv.org/abs/0902.1209 

  2. Mahoney et al., Information Accessibility and Cryptic Processes, 2021. https://arxiv.org/abs/0905.4787 

def is_row_stochastic(self):
985	def is_row_stochastic(self) :
986
987		"""
988		Check that all states have outgoing transition probabilities that sum to 1.
989		"""
990
991		sums = np.zeros( len( self.states ) )
992		for tr in self.transitions :
993			sums[ tr.origin_state_idx ] += tr.prob
994		return np.allclose( sums, 1.0 )

Check that all states have outgoing transition probabilities that sum to 1.

def is_unifilar(self):
 998	def is_unifilar(self) :
 999
1000		"""
1001		Check that no state emits the same symbol on transitions to different states. 
1002		"""
1003
1004		symbol_trs = np.full( ( len( self.states ), len( self.alphabet) ), -1 )
1005
1006		for tr in self.transitions : 
1007		
1008			if symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] == -1 :
1009				symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] = tr.target_state_idx
1010		
1011			elif symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] != tr.target_state_idx :
1012				return False
1013		
1014		return True

Check that no state emits the same symbol on transitions to different states.

def is_strongly_connected(self):
1018	def is_strongly_connected(self) :
1019
1020		"""
1021		Check if every state is reachable from every other state. Relies on [nx.is_strongly_connected](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.components.is_strongly_connected.html).
1022		"""
1023
1024		return nx.is_strongly_connected( self.as_digraph() )

Check if every state is reachable from every other state. Relies on nx.is_strongly_connected.

def is_aperiodic(self):
1028	def is_aperiodic(self) :
1029
1030		"""
1031		Checks if machine is periodic. Relies on [nx.is_aperiodic](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.dag.is_aperiodic.html), "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph."
1032		"""
1033
1034		return nx.is_aperiodic( self.as_digraph() )

Checks if machine is periodic. Relies on nx.is_aperiodic, "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph."

def is_topological_epsilon_machine(self, verbose=True):
1057	def is_topological_epsilon_machine( self, verbose=True ) :
1058
1059		"""
1060		Checks if the HMM is a topological $\\epsilon$-machine [^1].
1061
1062		[^1]: Johnson et al, Enumerating Finitary Processes, 2024.
1063			<https://arxiv.org/abs/1011.0036>
1064		"""
1065
1066		if not ( self.is_unifilar() and self.is_strongly_connected() ) :
1067			if verbose : 
1068				print( f"Either non unifilar or not strongly connected" )
1069			return False
1070		else :
1071			return self._is_minimal_as_dfa( topological_only=True, verbose=verbose )

Checks if the HMM is a topological $\epsilon$-machine 1.


  1. Johnson et al, Enumerating Finitary Processes, 2024. https://arxiv.org/abs/1011.0036 

def is_epsilon_machine(self, verbose=True):
1073	def is_epsilon_machine( self, verbose=True ) :
1074
1075		if not ( self.is_unifilar() and self.is_strongly_connected() ) :
1076			if verbose : 
1077				print( f"Either non unifilar or not strongly connected" )
1078			return False
1079		else :
1080			return self._is_minimal_as_dfa( topological_only=False, verbose=verbose )
def minimize(self, retain_names: bool = True, verbose=False):
1086	def minimize(self, retain_names: bool = True, verbose=False):
1087
1088		"""
1089		Minimizes the HMM, resulting in an :math:`\\epsilon-`machine if the HMM
1090		is unifilar and strongly connected. Converts the HMM to a DFA with symbols
1091		labeled jointly with symbols and probabilities, and uses Myhill-Nerode 
1092		equivalence for minimization. Relies on `automata_lib` and uses
1093		 `automata.fa.dfa.DFA.minify` with `allow_partial=True`, and all states
1094		 final.
1095
1096		Args:
1097			retain_names (bool): If `True`, the merged states will be named by their union, e.g. `{s_0, s_1}`, and other states will retain their origion names. Otherwise, they will be relabled `{ '0', '1', ..., 'n-1' }`.
1098
1099		Returns:
1100		
1101			automata.fa.dfa.DFA : the resulting DFA.
1102		"""
1103
1104		if self.is_minimal :
1105			return
1106
1107		start = time.perf_counter()
1108
1109		if not self.is_unifilar():
1110			raise ValueError(
1111				"DFA minimization is not valid for non-unifilar HMMs"
1112			)
1113
1114		was_strongly_connected = self.is_strongly_connected()
1115
1116		was_row_stochastic = self.is_row_stochastic()
1117		n_states_before = len(self.states)
1118
1119		dfa = self.as_dfa(with_probs=True)
1120
1121		#min_dfa = self.as_dfa(with_probs=True).minify(retain_names=True)
1122		min_dfa = am_fast.minify_cpp( dfa, retain_names=True )
1123
1124		# Build lookup from original state index -> CausalState object
1125		orig_state   = {i: s for i, s in enumerate(self.states)}
1126		eq_list      = list(min_dfa.states)
1127
1128		start_eq = min_dfa.initial_state
1129		
1130		# Separate the start state, then sort the rest by the 
1131		# smallest original state index inside each equivalence class.
1132		other_eqs = [eq for eq in eq_list if eq != start_eq]
1133		other_eqs.sort(key=lambda eq: min(eq))
1134
1135		# Recombine so start eq comes first, followed by the sorted remaining classes
1136		eq_list = [start_eq] + other_eqs
1137		# ----------------------------------------------------------
1138
1139		# Recompute eq_to_idx with the new ordering
1140		eq_to_idx = {eq: i for i, eq in enumerate(eq_list)}
1141
1142		# new_start is now guaranteed to be 0
1143		new_start = 0
1144
1145		# Map each original state index -> its equivalence class
1146		# Guard: minify() silently drops unreachable states
1147		orig_to_eq = {s: eq for eq in min_dfa.states for s in eq}
1148
1149		# Build lookup from original state index -> its transitions
1150		orig_trs = defaultdict(list)
1151		for t in self.transitions:
1152			orig_trs[t.origin_state_idx].append(t)
1153
1154		new_trs = []
1155		for eq in min_dfa.states:
1156			rep        = next(iter(eq))
1157			origin_idx = eq_to_idx[eq]
1158			for t in orig_trs[rep]:
1159				target_eq  = orig_to_eq[t.target_state_idx]
1160				target_idx = eq_to_idx[target_eq]
1161				new_trs.append(Transition(
1162					origin_state_idx = origin_idx,
1163					target_state_idx = target_idx,
1164					prob             = t.prob,
1165					symbol_idx       = t.symbol_idx,
1166				))
1167
1168		members_list = [[orig_state[i] for i in sorted(eq)] for eq in eq_list]  # sorted for determinism
1169
1170		# Compute new names
1171		if retain_names:
1172			new_names = [
1173				"{" + ",".join(str(m.name) for m in members) + "}" if len(members) > 1
1174				else members[0].name
1175				for members in members_list
1176			]
1177		else:
1178			new_names = [str(j) for j in range(len(eq_list))]
1179
1180		old_name_to_new_name = {
1181			m.name: new_names[j]
1182			for j, members in enumerate(members_list)
1183			for m in members
1184		}
1185
1186		# Build the new states, preserving classes and isomorphs regardless of naming
1187		new_states = []
1188		for j, (eq, members, name) in enumerate(zip(eq_list, members_list, new_names)):
1189			classes   = set().union(*(m.classes for m in members))
1190			isomorphs = {
1191				old_name_to_new_name.get(iso, iso)
1192				for m in members
1193				for iso in m.isomorphs
1194				if old_name_to_new_name.get(iso, iso) != name
1195			}
1196			new_states.append(CausalState(
1197				name      = name,
1198				classes   = classes,
1199				isomorphs = isomorphs,
1200			))
1201
1202		self.set_states(new_states)
1203		self.set_transitions(new_trs)
1204		self.start_state = new_start
1205
1206		if n_states_before == len(new_states) and verbose :
1207			print( f"{n_states_before} state HMM was already minimal.\n" )
1208		elif verbose :
1209			print( f"Minimized from {n_states_before} to {len(new_states)}\n" )
1210
1211		if not ( was_strongly_connected ==  self.is_strongly_connected() ) :
1212			raise RuntimeError(
1213				f"Minimization broke strongly connected"
1214			)
1215
1216		if not ( was_row_stochastic ==  self.is_row_stochastic() ) :
1217			raise RuntimeError(
1218				f"Minimization broke row stochasticity"
1219			)
1220
1221		self.is_minimal = True

Minimizes the HMM, resulting in an \( \epsilon- \)machine if the HMM is unifilar and strongly connected. Converts the HMM to a DFA with symbols labeled jointly with symbols and probabilities, and uses Myhill-Nerode equivalence for minimization. Relies on automata_lib and uses automata.fa.dfa.DFA.minify with allow_partial=True, and all states final.

Arguments:
  • retain_names (bool): If True, the merged states will be named by their union, e.g. {s_0, s_1}, and other states will retain their origion names. Otherwise, they will be relabled { '0', '1', ..., 'n-1' }.
Returns:

automata.fa.dfa.DFA : the resulting DFA.

def collapse_to_largest_strongly_connected_subgraph(self, rename_states=True):
1228	def collapse_to_largest_strongly_connected_subgraph( self, rename_states=True ) :
1229
1230		# get equivalent networkx graph
1231		G = self.as_digraph()
1232
1233		# if already strongly connected, nothing to do
1234		if not nx.is_strongly_connected( G ) :
1235
1236			start = time.perf_counter()
1237			subgraph_nodes = list( nx.strongly_connected_components( G ) )
1238
1239			# decompose into strongly connected components and sort by length
1240			# subgraph_nodes = list(nx.strongly_connected_components( G ))
1241			subgraph_nodes.sort(key=len)
1242			component_state_set = subgraph_nodes[-1]
1243
1244			# Take the largest strongly connected component (as list of state names)
1245			component_states = sorted( list( component_state_set ) )
1246
1247			# make temporary copies of the old transitions and states
1248			old_transitions = [
1249				Transition(
1250					origin_state_idx=tr.origin_state_idx,
1251					target_state_idx=tr.target_state_idx,
1252					prob=tr.prob,
1253					symbol_idx=tr.symbol_idx
1254				)
1255
1256				for tr in self.transitions
1257			]
1258
1259			old_states = [
1260				CausalState(
1261					name=s.name,
1262					classes=s.classes,
1263					isomorphs=s.isomorphs
1264				)
1265				for s in self.states
1266			]
1267
1268			self.set_states(
1269				states=[ 
1270					state
1271					for i, state in enumerate( old_states ) if i in component_state_set
1272				]
1273			)
1274
1275			# we will build new transition list based on those belonging to the component
1276			self.set_transitions( transitions= [] )
1277
1278			# for tracking which new transitions leave each state
1279			transitions_from_state = { state : set() for state in component_states }
1280			new_transitions = []
1281
1282			for tr in old_transitions :
1283
1284				origin_state_name = old_states[ tr.origin_state_idx ].name
1285				target_state_name = old_states[ tr.target_state_idx ].name
1286
1287				# skip transitions that connect separate strongly connected components
1288				if not ( tr.origin_state_idx in component_state_set and tr.target_state_idx in component_state_set ) :
1289					continue
1290
1291				# track transitions (by index in new transitions list) that leave this state
1292				transitions_from_state[ tr.origin_state_idx ].add( len( new_transitions ) )
1293
1294				my_origin_state_idx = self.state_idx_map[ origin_state_name ]
1295				my_target_state_idx = self.state_idx_map[ target_state_name ]
1296
1297				new_transitions.append( 
1298					Transition(
1299						origin_state_idx=my_origin_state_idx,
1300						target_state_idx=my_target_state_idx,
1301						prob=tr.prob,
1302						symbol_idx=tr.symbol_idx
1303					)  )
1304
1305			self.set_transitions( transitions=new_transitions )
1306
1307			# if we removed an outgoing transition from a state, we need to distribute its probability 
1308			# among the remaining outgoing transitions from the state
1309			for state in component_states :
1310				
1311				# get the set of transitions leaving this state
1312				state_trs = transitions_from_state[ state ]
1313
1314				# sum the probabilities of the outgoing transitions from the state
1315				p_sum = np.sum( [ self.transitions[ i ].prob for i in state_trs ] )
1316
1317				# how much probability is missing
1318				diff = 1.0 - p_sum
1319
1320				# if significant difference
1321				if abs( diff ) > self.EPS : 
1322
1323					# calculate how much of the difference each transition gets
1324					adjustment = diff / len( state_trs )
1325					
1326					# update the transitions
1327					for i in state_trs :
1328
1329						# transition with origional probability
1330						tr = self.transitions[ i ]
1331
1332						# adjusted probability
1333						self.transitions[ i ] = Transition(
1334							origin_state_idx=tr.origin_state_idx, 
1335							target_state_idx=tr.target_state_idx, 
1336							prob=tr.prob + adjustment, 
1337							symbol_idx=tr.symbol_idx )
1338
1339			if rename_states :
1340				self.set_states( [
1341					CausalState( name=f"{i}" )
1342					for i, s in enumerate( self.states )
1343				] )
def to_q_weighted(self, denominator_limit=1000):
1345	def to_q_weighted( self, denominator_limit=1000 ) :
1346
1347		"""
1348		Approximates the existing transition probabilities with exact fractions, stores 
1349		the fractional probabilities as Fraction in Transition.pq, and sets the floating
1350		point probabilty to `float(pq)`. If `denominator_limit` is too small for a sane 
1351		conversion, the function recurses with `denominator_limit=denominator_limit*10`.
1352
1353		Args:
1354			denominator_limit (int): The initial input to :meth:`Fraction.limit_denominator` in 
1355			the conversion.
1356		"""
1357
1358		if self.is_q_weighted :
1359			return
1360
1361		if not self.is_row_stochastic() :
1362			raise ValueError( "Cannot convert to q-weighted because not row stochastic" )
1363
1364		t_from = [[] for _ in range(len(self.states))]
1365
1366		for i, tr in enumerate( self.transitions ) :
1367			t_from[ tr.origin_state_idx ].append( i )
1368
1369		new_transitions = []
1370		for t_list in t_from :
1371			
1372			if not t_list : 
1373				continue
1374
1375			p_q_sum = Fraction(0,1)
1376			p_qs = []
1377
1378			for t_idx in t_list :
1379			
1380				p_q = Fraction( self.transitions[ t_idx ].prob ).limit_denominator( denominator_limit )
1381				p_q_sum += p_q
1382				p_qs.append( p_q )
1383
1384			if p_q_sum != Fraction(1,1) :
1385				
1386				max_pq_i = np.argmax( p_qs )
1387				max_oq = p_qs[ max_pq_i ]
1388
1389				diff = p_q_sum - Fraction(1,1)
1390
1391				# If recurse with higher resolution
1392				if diff > max_oq :
1393					return self.to_q_weighted( denominator_limit*10 )
1394				else :
1395					p_qs[ max_pq_i ] -= diff
1396
1397			for i, t_idx in enumerate( t_list ) :	
1398				new_transitions.append( 
1399					Transition( 
1400						origin_state_idx=self.transitions[  t_idx ].origin_state_idx,
1401						target_state_idx=self.transitions[  t_idx ].target_state_idx,
1402						prob=float(p_qs[ i ]),
1403						symbol_idx=self.transitions[  t_idx ].symbol_idx,
1404						pq=p_qs[ i ]
1405					)
1406				)
1407
1408		self.set_transitions( new_transitions )
1409		self.is_q_weighted = True

Approximates the existing transition probabilities with exact fractions, stores the fractional probabilities as Fraction in Transition.pq, and sets the floating point probabilty to float(pq). If denominator_limit is too small for a sane conversion, the function recurses with denominator_limit=denominator_limit*10.

Arguments:
  • denominator_limit (int): The initial input to Fraction.limit_denominator() in
  • the conversion.
def isomorphic_shift( self, input_symbol_indices: numpy.ndarray, input_state_indices: numpy.ndarray, shift: int = 1) -> dict[str, numpy.ndarray]:
1415	def isomorphic_shift(
1416		self,
1417		input_symbol_indices: np.ndarray,
1418		input_state_indices:  np.ndarray,
1419		shift : int = 1
1420	) -> dict[str, np.ndarray]:
1421
1422		"""
1423		Generates a new sequence of of symbols that are permuted with the symbols emitted by
1424		isomorphic states, if they exists.
1425
1426		:math:`\\sigma_o = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i]\\right]`<br>
1427		:math:`\\sigma_t = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i+1]\\right]`
1428 
1429		:math:`\\mathcal{I}(\\sigma_o) = \\\\{\\sigma^0_o,\\, \\sigma^1_o,\\, \\dots,\\, \\sigma^{n-1}_o \\\\}`<br>
1430		:math:`\\mathcal{I}(\\sigma_t) = \\\\{\\sigma^0_t,\\, \\sigma^1_t,\\, \\dots,\\, \\sigma^{n-1}_t \\\\}`
1431 
1432		:math:`k = \\bigl(\\mathcal{I}(\\sigma_o).\\texttt{index}(\\sigma_o) + \\texttt{shift}\\bigr) \\bmod n`
1433 
1434		:math:`\\texttt{output\\_symbol\\_indices}[i]   := T(\\sigma_o^k,\\, \\sigma_t^k).\\text{symbol\\_index}`<br>
1435		:math:`\\texttt{output\\_state\\_indices}[i]    := \\mathcal{S}.\\texttt{index}(\\sigma_o^k)`<br>
1436		:math:`\\texttt{output\\_state\\_indices}[i+1]  := \\mathcal{S}.\\texttt{index}(\\sigma_t^k)`
1437
1438		Where :math:`\\mathcal{I}(\\sigma)`` is the ordered set of states isomorphic to :math:`\\sigma` including :math:`\\sigma` itself.
1439
1440		Args:
1441			input_symbol_indices (np.ndarray): The sequence of generated symbols.
1442			input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end.
1443			shift : int: How much to shift the symbols across the isomorphic states.
1444		"""
1445
1446		if not any( state.isomorphs for state in self.states ):
1447			raise ValueError("HMM has no states with isomorphs")
1448
1449		inputs = np.asarray(input_symbol_indices)
1450		states = np.asarray(input_state_indices)
1451
1452		n_states = len(self.states)
1453
1454		tr_sym_table = np.full((n_states, n_states), -1, dtype=np.int32)
1455
1456		for tr in self.transitions:
1457			tr_sym_table[ tr.origin_state_idx, tr.target_state_idx ] = tr.symbol_idx
1458
1459		# Build isomorph remapping: identity by default, overridden where isomorphs exist
1460		iso_table = np.arange(n_states, dtype=np.int32)
1461
1462		for i, state in enumerate(self.states):
1463			if state.isomorphs:
1464				isomorph       = sorted(state.isomorphs)[0]
1465				iso_table[ i ] = self.state_idx_map[isomorph]
1466
1467		for i, state in enumerate(self.states):
1468			if state.isomorphs:
1469
1470				# extend the isomorph list to include the identity
1471				isormorphs_with_identity = sorted( [ i ] + [ self.state_idx_map[iso] for iso in state.isomorphs ] )
1472
1473				# find the identity index
1474				pos = isormorphs_with_identity.index( i )
1475
1476				# cyclical shift 
1477				iso_table[ i ] = isormorphs_with_identity[ ( pos + shift ) % len( isormorphs_with_identity ) ]
1478
1479		origins = states[:-1]
1480		targets = states[1:]
1481
1482		out_origins = iso_table[origins]
1483		out_targets = iso_table[targets]
1484
1485		inv_sym = tr_sym_table[ out_origins, out_targets ]
1486		inv_sts = np.empty(states.size, dtype=states.dtype)
1487
1488		inv_sts[:-1] = out_origins
1489		inv_sts[-1]  = out_targets[-1]
1490
1491		return {
1492			"symbol_index": inv_sym.astype(inputs.dtype),
1493			"state_index":  inv_sts,
1494		}

Generates a new sequence of of symbols that are permuted with the symbols emitted by isomorphic states, if they exists.

\( \sigma_o = \mathcal{S}\left[\texttt{input_state_indices}[i]\right] \)
\( \sigma_t = \mathcal{S}\left[\texttt{input_state_indices}[i+1]\right] \)

\( \mathcal{I}(\sigma_o) = \{\sigma^0_o,\, \sigma^1_o,\, \dots,\, \sigma^{n-1}_o \} \)
\( \mathcal{I}(\sigma_t) = \{\sigma^0_t,\, \sigma^1_t,\, \dots,\, \sigma^{n-1}_t \} \)

\( k = \bigl(\mathcal{I}(\sigma_o).\texttt{index}(\sigma_o) + \texttt{shift}\bigr) \bmod n \)

\( \texttt{output_symbol_indices}[i] := T(\sigma_o^k,\, \sigma_t^k).\text{symbol_index} \)
\( \texttt{output_state_indices}[i] := \mathcal{S}.\texttt{index}(\sigma_o^k) \)
\( \texttt{output_state_indices}[i+1] := \mathcal{S}.\texttt{index}(\sigma_t^k) \)

Where \( \mathcal{I}(\sigma) \)` is the ordered set of states isomorphic to \( \sigma \) including \( \sigma \) itself.

Arguments:
  • input_symbol_indices (np.ndarray): The sequence of generated symbols.
  • input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end.
  • shift : int: How much to shift the symbols across the isomorphic states.
def generate_data( self, file_prefix: str, n_gen: int, include_states: bool, isomorphic_shifts: set[int] = None, random_seed: int = 42) -> dict[any]:
1496	def generate_data(
1497		self,
1498		file_prefix: str,
1499		n_gen: int,
1500		include_states: bool,
1501		isomorphic_shifts : set[int]=None,
1502		random_seed : int=42 ) -> dict[any] : 
1503
1504		trs = [ [] for _ in range( len( self.states ) ) ]
1505		for tr in self.transitions :
1506			trs[ tr.origin_state_idx ].append( ( 
1507				tr.symbol_idx, 
1508				float( tr.prob ),
1509				tr.target_state_idx ) )
1510
1511		data = am_fast.generate_data(
1512			n_gen=n_gen,
1513			start_state=self.start_state,
1514			transitions=trs,
1515			alphabet=sorted(list(self.alphabet)),
1516			include_states=include_states,
1517			random_seed=random_seed
1518		)
1519
1520		if isomorphic_shifts is not None :
1521
1522			if not include_states :
1523				raise ValueError( "Isomorphic inversion requires include_states=True" )
1524
1525			data[ "isomorphic_shifts" ] = {}
1526
1527			for shift in isomorphic_shifts :
1528
1529				try : 
1530
1531					shifted = self.isomorphic_shift(
1532						input_symbol_indices=data[ "symbol_index" ], 
1533						input_state_indices=data[ "state_index" ], 
1534						shift=shift
1535					)
1536
1537					data[ "isomorphic_shifts" ][ shift ] = {
1538						"symbol_index" : shifted[ "symbol_index" ],
1539						"state_index"  : shifted[ "state_index" ]
1540					}
1541
1542				except Exception as e :
1543					print( f"Exception {e}" )
1544
1545		am_fast.save_data(
1546			data=data,
1547			file_prefix=file_prefix,
1548			alphabet=sorted(list(self.alphabet)),
1549			n_states=len( self.states ),
1550			start_state=self.start_state,
1551			random_seed=random_seed,
1552			machine_metadata=self.get_metadata() )
1553
1554		return data
def draw_block_entropy_curve(self):
1560	def draw_block_entropy_curve(self) :
1561		
1562		h_mu = self.h_mu()
1563		E = self.E()
1564
1565		if not "H_L" in self.complexity :
1566			C = self.block_convergence()
1567
1568		H_L = self.complexity[ "H_L" ]
1569		L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64')
1570		y = E + L*h_mu
1571
1572		plt.style.use('seaborn-v0_8-darkgrid') 
1573		fig, ax = plt.subplots(figsize=(10, 6), dpi=120)
1574		colors = plt.cm.tab10.colors 
1575
1576		ax.plot( L, H_L, color='#1E88E5', linewidth=2.0, linestyle='-', label=r'$H(L)$')
1577		ax.plot( L, y, color='#D81B60', linewidth=1.5, linestyle='--', label=r'$\mathbf{E} + h_{\mu}L$')
1578
1579		ax.fill_between(
1580			L, H_L, y, 
1581			color='gray', 
1582			alpha=0.5, 
1583			label=r'Transient information $\mathbf{T}$'
1584		)
1585
1586		ax.set_title('Block Entropy Curve', fontsize=16, fontweight='bold', pad=15)
1587		ax.set_xlabel('L', fontsize=12, labelpad=10)
1588		ax.set_ylabel('Bits', fontsize=12, labelpad=10)
1589
1590		legend = ax.legend(
1591			loc='upper left', 
1592			title_fontsize='11',
1593			fontsize='10',
1594			frameon=True,
1595			facecolor='white',
1596			shadow=True
1597		)
1598
1599		legend.get_title().set_fontweight('bold')
1600		plt.tight_layout() 
1601		plt.show()
def draw_block_measure_curves(self):
1603	def draw_block_measure_curves(self) :
1604		
1605		h_mu = self.h_mu()
1606		E = self.E()
1607
1608		if not "H_L" in self.complexity :
1609			C = self.block_convergence()
1610
1611		H_L = self.complexity[ "H_L" ]
1612		E_L = self.complexity[ "E_L" ]
1613		T_L = self.complexity[ "T_L" ]
1614		S_L = self.complexity[ "S_L" ]
1615		H_sync = self.complexity[ "H_sync" ][1:]
1616		h_mu_L = self.complexity[ "h_mu_L" ]
1617
1618		L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64')
1619		y = E + L*h_mu
1620
1621		plt.style.use('seaborn-v0_8-darkgrid') 
1622		fig, ax = plt.subplots(figsize=(10, 6), dpi=120)
1623		colors = plt.cm.tab10.colors 
1624
1625		ax.plot( L, H_L, color=colors[0], linewidth=2.0, linestyle='-', label=r'$H(L)$')
1626		ax.plot( L, E_L, color=colors[1], linewidth=2.0, linestyle='-', label=r'$\mathbf{E}(L)$')
1627		ax.plot( L, T_L, color=colors[2], linewidth=2.0, linestyle='-', label=r'$\mathbf{T}(L)$')
1628		ax.plot( L, S_L, color=colors[3], linewidth=2.0, linestyle='-', label=r'$\mathbf{S}(L)$')
1629		ax.plot( L, H_sync, color=colors[4], linewidth=2.0, linestyle='-', label=r'$\mathcal{H}(L)$')
1630		ax.plot( L, h_mu_L, color=colors[5], linewidth=2.0, linestyle='-', label=r'$h_{\mu}(L)$')
1631
1632		ax.set_title('Block Measures', fontsize=16, fontweight='bold', pad=15)
1633		ax.set_xlabel('L', fontsize=12, labelpad=10)
1634		ax.set_ylabel('Bits', fontsize=12, labelpad=10)
1635
1636		legend = ax.legend(
1637			loc='upper left', 
1638			title_fontsize='11',
1639			fontsize='10',
1640			frameon=True,
1641			facecolor='white',
1642			shadow=True
1643		)
1644
1645		legend.get_title().set_fontweight('bold')
1646		plt.tight_layout() 
1647		plt.show()
def draw_graph( self, engine: str = 'dot', output_dir: pathlib.Path = PosixPath('.'), show: bool = True, symbols_only: bool = False) -> None:
1649	def draw_graph(
1650		self,
1651		engine     : str  = 'dot',
1652		output_dir : Path = Path('.'),
1653		show       : bool = True,
1654		symbols_only : bool = False
1655	) -> None :
1656
1657		"""
1658		Draws the machine using [pygraphiviz](https://pygraphviz.github.io/documentation/stable/) and saves it.
1659
1660		Returns:
1661		
1662			networkx.DiGraph : the resulting graph.
1663		"""
1664
1665		G = self.as_digraph()
1666
1667		subgraphs = None if nx.is_strongly_connected( G ) else list( nx.strongly_connected_components( G ) )
1668		
1669		am_vis.draw_graph( 
1670			self, 
1671			output_dir=output_dir, 
1672			title="am_graph", 
1673			view=show, 
1674			subgraphs=subgraphs, 
1675			engine=engine,
1676			symbols_only=symbols_only )

Draws the machine using pygraphiviz and saves it.

Returns:

networkx.DiGraph : the resulting graph.

def as_digraph(self) -> networkx.classes.digraph.DiGraph:
1683	def as_digraph( self ) -> nx.DiGraph :
1684
1685		"""
1686		Builds a [networkx.DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html) constructed from the machine's transitions with no edge symbols or weights.
1687
1688		Returns:
1689		
1690			networkx.DiGraph : the resulting graph.
1691		"""
1692
1693		G = nx.DiGraph()
1694		G.add_nodes_from( [ i for i, s in enumerate( self.states ) ] )
1695
1696		for tr in self.transitions :
1697			G.add_edge( tr.origin_state_idx, tr.target_state_idx )
1698
1699		return G

Builds a networkx.DiGraph constructed from the machine's transitions with no edge symbols or weights.

Returns:

networkx.DiGraph : the resulting graph.

def as_dfa(self, with_probs: bool):
1701	def as_dfa( self, with_probs : bool ) :
1702
1703		"""
1704		Builds an [automata.fa.dfa.DFA](https://caleb531.github.io/automata/api/fa/class-dfa/) constructed from the machine's transitions.
1705
1706		Args:
1707			with_probs (bool): If true the DFA transitions are labeled based on
1708				the symbol of the machines transition concatenated with its
1709				probability, othwise, the only the symbols.
1710
1711		Returns:
1712		
1713			automata.fa.dfa.DFA : the resulting DFA.
1714		"""
1715
1716		precision=8
1717
1718		def edge_label( symb, prob ) :
1719			return f"({symb},{round(prob, precision)})"
1720
1721		# Build states, symbols, and transitions 
1722		dfa_states  = { i for i, _ in enumerate( self.states ) }
1723			
1724		if not with_probs :
1725			dfa_symbols = set( { t.symbol_idx for t in self.transitions } )
1726		else : 
1727			dfa_symbols = set( { edge_label( t.symbol_idx, t.prob ) for t in self.transitions } )
1728
1729		dfa_transitions = defaultdict(dict)
1730
1731		if not with_probs :
1732			for t in self.transitions :
1733				dfa_transitions[ t.origin_state_idx ][ t.symbol_idx ] = t.target_state_idx
1734		else :
1735			for t in self.transitions :
1736				dfa_transitions[ t.origin_state_idx ][ edge_label( t.symbol_idx, t.prob ) ] = t.target_state_idx
1737
1738		# Construct the DFA
1739		return DFA(
1740			states=dfa_states,
1741			input_symbols=dfa_symbols,
1742			transitions=dfa_transitions,
1743			initial_state=self.start_state,
1744			allow_partial=True,
1745			final_states={ 
1746				s for s in dfa_states
1747			}
1748		)

Builds an automata.fa.dfa.DFA constructed from the machine's transitions.

Arguments:
  • with_probs (bool): If true the DFA transitions are labeled based on the symbol of the machines transition concatenated with its probability, othwise, the only the symbols.
Returns:

automata.fa.dfa.DFA : the resulting DFA.