amachine.am_create
1from collections import defaultdict 2import copy 3import random 4 5import numpy as np 6 7from .am_hmm import HMM 8from .am_causal_state import CausalState 9from .am_transition import Transition 10 11from .am_random import uniform_dist, exp_uniform_blend, resolve_rng 12 13def star_join( 14 exit_symbol : str, 15 enter_symbols : list[str], 16 machines : list[HMM], 17 mode_residency_factor : float ) -> HMM : 18 19 machine = HMM() 20 21 isomorphic_groups = defaultdict(list) 22 for i, m in enumerate( machines ) : 23 if m.isoclass is not None : 24 isomorphic_groups[ m.isoclass ].append( i ) 25 26 # since we are merging multuple machines which might have name collision 27 # we need to rename the states to ensure uniqueness 28 def rename_state( base_name : str, g : int ) : 29 return f"{g}/{base_name}" 30 31 def get_gid( idx : int, isoclass : int | None ) : 32 return idx if isoclass is None else isoclass 33 34 machine.set_alphabet( [ exit_symbol ] ) 35 36 for m in machines : 37 machine.extend_alphabet( alphabet=m.alphabet ) 38 39 # create a connector state and connector state class 40 41 connector_state = CausalState( 42 name=f"/c", 43 classes=set({"connector"}) 44 ) 45 46 # initial states before adding each machines states 47 machine.set_states( [ connector_state ] ) 48 machine.start_state = 0 49 50 # number of machines (groups of states) 51 n_groups = len( machines ) 52 53 # make sure we have enough symbols (otherwise connector can't be unifilar) 54 if n_groups > len(enter_symbols) : 55 raise Exception( 56 f"Too few enter symbols given number of machines" 57 ) 58 59 # for each given machine 60 for m_idx, m in enumerate( machines ) : 61 62 # default to the index of the machine in the list 63 m_gid = get_gid( m_idx, m.isoclass ) 64 65 # give the states from this machine a class name 66 m_classes = { 67 f"m_{m_idx}", 68 f"isoclass_{m.isoclass}" 69 } 70 71 added_states = [] 72 73 # create a state and extend our existing machine to include it 74 for s_idx, state in enumerate( m.states ) : 75 76 isomorphs=set() 77 if m.isoclass is not None and m.isoclass in isomorphic_groups : 78 79 for other_idx in isomorphic_groups[ m.isoclass ] : 80 81 if other_idx == m_idx : 82 continue 83 84 other_m = machines[ other_idx ] 85 isomorphs.add( 86 rename_state( 87 other_m.states[ s_idx ].name, 88 get_gid( other_idx, other_m.isoclass ) ) 89 ) 90 91 added_states.append( 92 CausalState( 93 name=rename_state(state.name, m_gid), 94 classes=( m_classes | state.classes ), 95 isomorphs=isomorphs 96 ) 97 ) 98 99 machine.extend_states( added_states ) 100 101 added_transitions = [] 102 103 # add all of the transitions from the machine 104 for tr in m.transitions : 105 106 # get the names of the states for the transition 107 origin_state_name = rename_state( m.states[ tr.origin_state_idx ].name, m_gid ) 108 target_state_name = rename_state( m.states[ tr.target_state_idx ].name, m_gid ) 109 110 # idx of the symbol remaped to this machines alphabet list 111 new_symbol_idx = machine.symbol_idx_map[ m.alphabet[ tr.symbol_idx ] ] 112 113 # create and add the new transition 114 added_transitions.append( Transition( 115 origin_state_idx=machine.state_idx_map[ origin_state_name ], 116 target_state_idx=machine.state_idx_map[ target_state_name ], 117 prob=tr.prob, 118 symbol_idx=new_symbol_idx 119 ) ) 120 121 machine.extend_transitions( added_transitions ) 122 123 # Add connector transitions, and adjust transition probabilities to sum to 1 124 125 # the name of the state that is the entry point to this group from the connector 126 m_entry_state_name = rename_state( m.states[ m.start_state ].name, m_gid ) 127 128 # get the index of the entry state for this machine 129 m_entry_state_idx = machine.state_idx_map[ m_entry_state_name ] 130 131 132 # Get the within group transitions from m's entry state 133 # ( the probabilities will need to be adjusted ) 134 transition_ids_from_m_entry = set() 135 for i, tr in enumerate( machine.transitions ) : 136 if tr.origin_state_idx == m_entry_state_idx : 137 transition_ids_from_m_entry.add( i ) 138 139 n_from_entry = len( transition_ids_from_m_entry ) 140 141 # Pr of staying in this group is distributed over the within group outgoing edges from the entry state 142 for i in transition_ids_from_m_entry : 143 144 machine.transitions[ i ] = Transition( 145 origin_state_idx=machine.transitions[ i ].origin_state_idx, 146 target_state_idx=machine.transitions[ i ].target_state_idx, 147 prob=mode_residency_factor / n_from_entry, 148 symbol_idx=machine.transitions[ i ].symbol_idx 149 ) 150 151 # from m's entry state back to connector 152 escape_pr = 1.0 - mode_residency_factor 153 154 machine.extend_transitions( transitions=[ 155 Transition( 156 origin_state_idx=m_entry_state_idx, 157 target_state_idx=machine.start_state, 158 prob=escape_pr, 159 symbol_idx=machine.symbol_idx_map[ exit_symbol ] 160 ) 161 ] ) 162 163 # from the connector to m's entry state 164 machine.extend_alphabet( alphabet=[ enter_symbols[ m_idx ] ] ) 165 166 machine.extend_transitions( transitions=[ 167 Transition( 168 origin_state_idx=machine.start_state, 169 target_state_idx=m_entry_state_idx, 170 prob=( 1.0 / n_groups ), 171 symbol_idx=machine.symbol_idx_map[ enter_symbols[ m_idx ] ] 172 ) 173 ] ) 174 175 return machine 176 177def star( 178 exit_symbol : str, 179 enter_symbols : list[str], 180 normal_symbols : list[str], 181 n_modes : int = 7, 182 n_isomorphic : int = 2, 183 randomness : float = 0.3, 184 connectedness : float = 0.5, 185 residency_factor : float = 0.5, 186 n_normal_symbols : int = 4, 187 t_states_per_machine : int = 17 ) -> HMM : 188 189 if len( normal_symbols ) < n_normal_symbols*n_isomorphic : 190 raise ValueError( "Must have at least n_normal_symbols*n_isomorphic normal symbols" ) 191 192 if len( enter_symbols ) < n_modes*n_isomorphic : 193 raise ValueError( "Must have at least n_modes*n_isomorphic enter symbols" ) 194 195 alphabet = [ f"{normal_symbols[i]}" for i in range( 0, n_normal_symbols ) ] 196 iso_alphabet = [ f"{normal_symbols[i]}" for i in range( n_normal_symbols, n_normal_symbols*2 ) ] 197 198 random_machines = [] 199 200 for i in range( n_modes ) : 201 202 m = random_machine( 203 n_states=t_states_per_machine, 204 symbols=alphabet, 205 randomness=randomness, 206 connectedness=connectedness ) 207 208 m.collapse_to_largest_strongly_connected_subgraph() 209 m_iso = isomorphic_to( m, alphabet=iso_alphabet ) 210 211 m.isoclass = f"{i}" 212 m_iso.isoclass = f"{i}" 213 214 for j, state in enumerate( m.states ) : 215 m.states[ j ].add_isomorph( m_iso.states[ j ].name ) 216 m_iso.states[ j ].add_isomorph( m.states[ j ].name ) 217 218 random_machines.append( m ) 219 random_machines.append( m_iso ) 220 221 mode_machine = star_join( 222 exit_symbol=exit_symbol, 223 enter_symbols=enter_symbols, 224 machines=random_machines, 225 mode_residency_factor=residency_factor 226 ) 227 228 return mode_machine 229 230 231def isomorphic_to( 232 m : HMM, 233 alphabet : list[str], 234 decorator : str = '@' ) -> HMM : 235 236 # make sure there are enough symbols 237 if len( alphabet ) < len( m.alphabet ) : 238 raise ValueError( "Not enough symbols in the alphabet" ) 239 240 # take the as much of them as needed 241 alphabet_used = alphabet[ 0 : len( m.alphabet ) ] 242 243 states = [ 244 CausalState( 245 name=f"{s.name}{decorator}", 246 classes=copy.deepcopy( s.classes ) 247 ) 248 for s in m.states 249 ] 250 251 return HMM( 252 states=states, 253 transitions=copy.deepcopy( m.transitions ), 254 start_state=0, 255 alphabet=alphabet_used 256 ) 257 258def random_machine( 259 n_states : int, 260 symbols : list[str], 261 connectedness : float, 262 randomness : float, 263 random_seed : int | None = None ) -> HMM : 264 265 if random_seed is not None: 266 py_rng = random.Random(random_seed) 267 np_rng = np.random.default_rng(random_seed) 268 else: 269 py_rng = random 270 np_rng = resolve_rng(None) 271 272 states=[ 273 CausalState( name=f"{i}" ) 274 for i in range( n_states ) 275 ] 276 277 n_symbols = len( symbols ) 278 transitions = [] 279 280 for state_idx, state in enumerate( states ) : 281 282 n_transitions = sum( py_rng.random() < connectedness for _ in range( n_symbols - 1 ) ) + 1 283 transition_to = py_rng.sample( range( n_states ), n_transitions ) 284 285 transition_probabilities = exp_uniform_blend( 286 n=n_transitions, 287 alpha=randomness, 288 np_rng=np_rng ) 289 290 transition_symbols_indices = py_rng.sample( range( n_symbols ), n_transitions ) 291 292 for i, p in enumerate( transition_probabilities ) : 293 transitions.append( 294 Transition( 295 origin_state_idx=state_idx, 296 target_state_idx=transition_to[ i ], 297 prob=p, 298 symbol_idx=transition_symbols_indices[ i ] 299 ) 300 ) 301 302 return HMM( 303 states=states, 304 transitions=transitions, 305 start_state=0, 306 alphabet=symbols.copy() 307 )
def
star_join( exit_symbol: str, enter_symbols: list[str], machines: list[amachine.am_hmm.HMM], mode_residency_factor: float) -> amachine.am_hmm.HMM:
14def star_join( 15 exit_symbol : str, 16 enter_symbols : list[str], 17 machines : list[HMM], 18 mode_residency_factor : float ) -> HMM : 19 20 machine = HMM() 21 22 isomorphic_groups = defaultdict(list) 23 for i, m in enumerate( machines ) : 24 if m.isoclass is not None : 25 isomorphic_groups[ m.isoclass ].append( i ) 26 27 # since we are merging multuple machines which might have name collision 28 # we need to rename the states to ensure uniqueness 29 def rename_state( base_name : str, g : int ) : 30 return f"{g}/{base_name}" 31 32 def get_gid( idx : int, isoclass : int | None ) : 33 return idx if isoclass is None else isoclass 34 35 machine.set_alphabet( [ exit_symbol ] ) 36 37 for m in machines : 38 machine.extend_alphabet( alphabet=m.alphabet ) 39 40 # create a connector state and connector state class 41 42 connector_state = CausalState( 43 name=f"/c", 44 classes=set({"connector"}) 45 ) 46 47 # initial states before adding each machines states 48 machine.set_states( [ connector_state ] ) 49 machine.start_state = 0 50 51 # number of machines (groups of states) 52 n_groups = len( machines ) 53 54 # make sure we have enough symbols (otherwise connector can't be unifilar) 55 if n_groups > len(enter_symbols) : 56 raise Exception( 57 f"Too few enter symbols given number of machines" 58 ) 59 60 # for each given machine 61 for m_idx, m in enumerate( machines ) : 62 63 # default to the index of the machine in the list 64 m_gid = get_gid( m_idx, m.isoclass ) 65 66 # give the states from this machine a class name 67 m_classes = { 68 f"m_{m_idx}", 69 f"isoclass_{m.isoclass}" 70 } 71 72 added_states = [] 73 74 # create a state and extend our existing machine to include it 75 for s_idx, state in enumerate( m.states ) : 76 77 isomorphs=set() 78 if m.isoclass is not None and m.isoclass in isomorphic_groups : 79 80 for other_idx in isomorphic_groups[ m.isoclass ] : 81 82 if other_idx == m_idx : 83 continue 84 85 other_m = machines[ other_idx ] 86 isomorphs.add( 87 rename_state( 88 other_m.states[ s_idx ].name, 89 get_gid( other_idx, other_m.isoclass ) ) 90 ) 91 92 added_states.append( 93 CausalState( 94 name=rename_state(state.name, m_gid), 95 classes=( m_classes | state.classes ), 96 isomorphs=isomorphs 97 ) 98 ) 99 100 machine.extend_states( added_states ) 101 102 added_transitions = [] 103 104 # add all of the transitions from the machine 105 for tr in m.transitions : 106 107 # get the names of the states for the transition 108 origin_state_name = rename_state( m.states[ tr.origin_state_idx ].name, m_gid ) 109 target_state_name = rename_state( m.states[ tr.target_state_idx ].name, m_gid ) 110 111 # idx of the symbol remaped to this machines alphabet list 112 new_symbol_idx = machine.symbol_idx_map[ m.alphabet[ tr.symbol_idx ] ] 113 114 # create and add the new transition 115 added_transitions.append( Transition( 116 origin_state_idx=machine.state_idx_map[ origin_state_name ], 117 target_state_idx=machine.state_idx_map[ target_state_name ], 118 prob=tr.prob, 119 symbol_idx=new_symbol_idx 120 ) ) 121 122 machine.extend_transitions( added_transitions ) 123 124 # Add connector transitions, and adjust transition probabilities to sum to 1 125 126 # the name of the state that is the entry point to this group from the connector 127 m_entry_state_name = rename_state( m.states[ m.start_state ].name, m_gid ) 128 129 # get the index of the entry state for this machine 130 m_entry_state_idx = machine.state_idx_map[ m_entry_state_name ] 131 132 133 # Get the within group transitions from m's entry state 134 # ( the probabilities will need to be adjusted ) 135 transition_ids_from_m_entry = set() 136 for i, tr in enumerate( machine.transitions ) : 137 if tr.origin_state_idx == m_entry_state_idx : 138 transition_ids_from_m_entry.add( i ) 139 140 n_from_entry = len( transition_ids_from_m_entry ) 141 142 # Pr of staying in this group is distributed over the within group outgoing edges from the entry state 143 for i in transition_ids_from_m_entry : 144 145 machine.transitions[ i ] = Transition( 146 origin_state_idx=machine.transitions[ i ].origin_state_idx, 147 target_state_idx=machine.transitions[ i ].target_state_idx, 148 prob=mode_residency_factor / n_from_entry, 149 symbol_idx=machine.transitions[ i ].symbol_idx 150 ) 151 152 # from m's entry state back to connector 153 escape_pr = 1.0 - mode_residency_factor 154 155 machine.extend_transitions( transitions=[ 156 Transition( 157 origin_state_idx=m_entry_state_idx, 158 target_state_idx=machine.start_state, 159 prob=escape_pr, 160 symbol_idx=machine.symbol_idx_map[ exit_symbol ] 161 ) 162 ] ) 163 164 # from the connector to m's entry state 165 machine.extend_alphabet( alphabet=[ enter_symbols[ m_idx ] ] ) 166 167 machine.extend_transitions( transitions=[ 168 Transition( 169 origin_state_idx=machine.start_state, 170 target_state_idx=m_entry_state_idx, 171 prob=( 1.0 / n_groups ), 172 symbol_idx=machine.symbol_idx_map[ enter_symbols[ m_idx ] ] 173 ) 174 ] ) 175 176 return machine
def
star( exit_symbol: str, enter_symbols: list[str], normal_symbols: list[str], n_modes: int = 7, n_isomorphic: int = 2, randomness: float = 0.3, connectedness: float = 0.5, residency_factor: float = 0.5, n_normal_symbols: int = 4, t_states_per_machine: int = 17) -> amachine.am_hmm.HMM:
178def star( 179 exit_symbol : str, 180 enter_symbols : list[str], 181 normal_symbols : list[str], 182 n_modes : int = 7, 183 n_isomorphic : int = 2, 184 randomness : float = 0.3, 185 connectedness : float = 0.5, 186 residency_factor : float = 0.5, 187 n_normal_symbols : int = 4, 188 t_states_per_machine : int = 17 ) -> HMM : 189 190 if len( normal_symbols ) < n_normal_symbols*n_isomorphic : 191 raise ValueError( "Must have at least n_normal_symbols*n_isomorphic normal symbols" ) 192 193 if len( enter_symbols ) < n_modes*n_isomorphic : 194 raise ValueError( "Must have at least n_modes*n_isomorphic enter symbols" ) 195 196 alphabet = [ f"{normal_symbols[i]}" for i in range( 0, n_normal_symbols ) ] 197 iso_alphabet = [ f"{normal_symbols[i]}" for i in range( n_normal_symbols, n_normal_symbols*2 ) ] 198 199 random_machines = [] 200 201 for i in range( n_modes ) : 202 203 m = random_machine( 204 n_states=t_states_per_machine, 205 symbols=alphabet, 206 randomness=randomness, 207 connectedness=connectedness ) 208 209 m.collapse_to_largest_strongly_connected_subgraph() 210 m_iso = isomorphic_to( m, alphabet=iso_alphabet ) 211 212 m.isoclass = f"{i}" 213 m_iso.isoclass = f"{i}" 214 215 for j, state in enumerate( m.states ) : 216 m.states[ j ].add_isomorph( m_iso.states[ j ].name ) 217 m_iso.states[ j ].add_isomorph( m.states[ j ].name ) 218 219 random_machines.append( m ) 220 random_machines.append( m_iso ) 221 222 mode_machine = star_join( 223 exit_symbol=exit_symbol, 224 enter_symbols=enter_symbols, 225 machines=random_machines, 226 mode_residency_factor=residency_factor 227 ) 228 229 return mode_machine
def
isomorphic_to( m: amachine.am_hmm.HMM, alphabet: list[str], decorator: str = '@') -> amachine.am_hmm.HMM:
232def isomorphic_to( 233 m : HMM, 234 alphabet : list[str], 235 decorator : str = '@' ) -> HMM : 236 237 # make sure there are enough symbols 238 if len( alphabet ) < len( m.alphabet ) : 239 raise ValueError( "Not enough symbols in the alphabet" ) 240 241 # take the as much of them as needed 242 alphabet_used = alphabet[ 0 : len( m.alphabet ) ] 243 244 states = [ 245 CausalState( 246 name=f"{s.name}{decorator}", 247 classes=copy.deepcopy( s.classes ) 248 ) 249 for s in m.states 250 ] 251 252 return HMM( 253 states=states, 254 transitions=copy.deepcopy( m.transitions ), 255 start_state=0, 256 alphabet=alphabet_used 257 )
def
random_machine( n_states: int, symbols: list[str], connectedness: float, randomness: float, random_seed: int | None = None) -> amachine.am_hmm.HMM:
259def random_machine( 260 n_states : int, 261 symbols : list[str], 262 connectedness : float, 263 randomness : float, 264 random_seed : int | None = None ) -> HMM : 265 266 if random_seed is not None: 267 py_rng = random.Random(random_seed) 268 np_rng = np.random.default_rng(random_seed) 269 else: 270 py_rng = random 271 np_rng = resolve_rng(None) 272 273 states=[ 274 CausalState( name=f"{i}" ) 275 for i in range( n_states ) 276 ] 277 278 n_symbols = len( symbols ) 279 transitions = [] 280 281 for state_idx, state in enumerate( states ) : 282 283 n_transitions = sum( py_rng.random() < connectedness for _ in range( n_symbols - 1 ) ) + 1 284 transition_to = py_rng.sample( range( n_states ), n_transitions ) 285 286 transition_probabilities = exp_uniform_blend( 287 n=n_transitions, 288 alpha=randomness, 289 np_rng=np_rng ) 290 291 transition_symbols_indices = py_rng.sample( range( n_symbols ), n_transitions ) 292 293 for i, p in enumerate( transition_probabilities ) : 294 transitions.append( 295 Transition( 296 origin_state_idx=state_idx, 297 target_state_idx=transition_to[ i ], 298 prob=p, 299 symbol_idx=transition_symbols_indices[ i ] 300 ) 301 ) 302 303 return HMM( 304 states=states, 305 transitions=transitions, 306 start_state=0, 307 alphabet=symbols.copy() 308 )