GitLab Repo

amachine.am_create

  1from collections import defaultdict
  2import copy
  3import random
  4
  5import numpy as np
  6
  7from .am_hmm          import HMM
  8from .am_causal_state import CausalState
  9from .am_transition   import Transition
 10
 11from .am_random import uniform_dist, exp_uniform_blend, resolve_rng
 12
 13def star_join(
 14	exit_symbol : str, 
 15	enter_symbols : list[str],
 16	machines : list[HMM],
 17	mode_residency_factor : float ) -> HMM :
 18
 19	machine = HMM()
 20
 21	isomorphic_groups = defaultdict(list)
 22	for i, m in enumerate( machines ) :
 23		if m.isoclass is not None :
 24			isomorphic_groups[ m.isoclass ].append( i )
 25
 26	# since we are merging multuple machines which might have name collision
 27	# we need to rename the states to ensure uniqueness
 28	def rename_state( base_name : str, g : int ) :
 29		return f"{g}/{base_name}"
 30
 31	def get_gid( idx : int, isoclass : int | None ) :
 32		return idx if isoclass is None else isoclass
 33
 34	machine.set_alphabet( [ exit_symbol ] )
 35
 36	for m in machines :
 37		machine.extend_alphabet( alphabet=m.alphabet )
 38
 39	# create a connector state and connector state class
 40
 41	connector_state = CausalState(
 42		name=f"/c", 
 43		classes=set({"connector"}) 
 44	)
 45	
 46	# initial states before adding each machines states
 47	machine.set_states( [ connector_state ] )
 48	machine.start_state = 0
 49
 50	# number of machines (groups of states)
 51	n_groups = len( machines )
 52
 53	# make sure we have enough symbols (otherwise connector can't be unifilar)
 54	if n_groups > len(enter_symbols) :
 55		raise Exception(
 56			f"Too few enter symbols given number of machines"
 57		)
 58
 59	# for each given machine
 60	for m_idx, m in enumerate( machines ) : 
 61
 62		# default to the index of the machine in the list
 63		m_gid = get_gid( m_idx, m.isoclass )
 64
 65		# give the states from this machine a class name
 66		m_classes = { 
 67			f"m_{m_idx}", 
 68			f"isoclass_{m.isoclass}" 
 69		}
 70
 71		added_states = []
 72		
 73		# create a state and extend our existing machine to include it
 74		for s_idx, state in enumerate( m.states ) :
 75
 76			isomorphs=set()
 77			if m.isoclass is not None and m.isoclass in isomorphic_groups :
 78
 79				for other_idx in isomorphic_groups[ m.isoclass ] :
 80					
 81					if other_idx == m_idx : 
 82						continue
 83					
 84					other_m = machines[ other_idx ]
 85					isomorphs.add(  
 86						rename_state( 
 87							other_m.states[ s_idx ].name, 
 88							get_gid( other_idx, other_m.isoclass ) )
 89					)
 90
 91			added_states.append( 
 92				CausalState( 
 93					name=rename_state(state.name, m_gid),
 94					classes=( m_classes | state.classes ),
 95					isomorphs=isomorphs
 96				) 
 97			)
 98
 99		machine.extend_states( added_states )
100
101		added_transitions = []
102
103		# add all of the transitions from the machine
104		for tr in m.transitions :
105
106			# get the names of the states for the transition
107			origin_state_name = rename_state( m.states[ tr.origin_state_idx ].name, m_gid )
108			target_state_name = rename_state( m.states[ tr.target_state_idx ].name, m_gid )
109
110			# idx of the symbol remaped to this machines alphabet list
111			new_symbol_idx = machine.symbol_idx_map[ m.alphabet[ tr.symbol_idx ] ]
112
113			# create and add the new transition
114			added_transitions.append( Transition(
115				origin_state_idx=machine.state_idx_map[ origin_state_name ],
116				target_state_idx=machine.state_idx_map[ target_state_name ],
117				prob=tr.prob,
118				symbol_idx=new_symbol_idx
119			) )
120
121		machine.extend_transitions( added_transitions )
122
123		# Add connector transitions, and adjust transition probabilities to sum to 1
124
125		# the name of the state that is the entry point to this group from the connector
126		m_entry_state_name = rename_state( m.states[ m.start_state ].name, m_gid )
127
128		# get the index of the entry state for this machine
129		m_entry_state_idx = machine.state_idx_map[ m_entry_state_name ]
130
131
132		# Get the within group transitions from m's entry state
133		# ( the probabilities will need to be adjusted )
134		transition_ids_from_m_entry = set()
135		for i, tr in enumerate( machine.transitions ) : 
136			if tr.origin_state_idx == m_entry_state_idx :
137				transition_ids_from_m_entry.add( i )
138		
139		n_from_entry = len( transition_ids_from_m_entry )
140
141		# Pr of staying in this group is distributed over the within group outgoing edges from the entry state 
142		for i in transition_ids_from_m_entry :
143
144			machine.transitions[ i ] = Transition(
145				origin_state_idx=machine.transitions[ i ].origin_state_idx,
146				target_state_idx=machine.transitions[ i ].target_state_idx,
147				prob=mode_residency_factor / n_from_entry,
148				symbol_idx=machine.transitions[ i ].symbol_idx
149			)
150
151		# from m's entry state back to connector
152		escape_pr = 1.0 - mode_residency_factor
153
154		machine.extend_transitions( transitions=[
155			Transition(
156				origin_state_idx=m_entry_state_idx,
157				target_state_idx=machine.start_state,
158				prob=escape_pr,
159				symbol_idx=machine.symbol_idx_map[ exit_symbol ]
160			)
161		] )
162
163		# from the connector to m's entry state
164		machine.extend_alphabet( alphabet=[ enter_symbols[ m_idx ] ] )
165		
166		machine.extend_transitions( transitions=[
167			Transition(
168				origin_state_idx=machine.start_state,
169				target_state_idx=m_entry_state_idx,
170				prob=( 1.0 / n_groups ),
171				symbol_idx=machine.symbol_idx_map[ enter_symbols[ m_idx ] ]
172			)
173		] )
174
175	return machine
176
177def star(
178	exit_symbol          : str,
179	enter_symbols        : list[str],
180	normal_symbols       : list[str],
181	n_modes              : int = 7,
182	n_isomorphic         : int = 2,
183	randomness           : float = 0.3,
184	connectedness        : float = 0.5,
185	residency_factor     : float = 0.5,
186	n_normal_symbols     : int = 4,
187	t_states_per_machine : int = 17 )  -> HMM :
188
189	if len( normal_symbols ) < n_normal_symbols*n_isomorphic :
190		raise ValueError( "Must have at least n_normal_symbols*n_isomorphic normal symbols" )
191
192	if len( enter_symbols ) < n_modes*n_isomorphic :
193		raise ValueError( "Must have at least n_modes*n_isomorphic enter symbols" )
194
195	alphabet     = [ f"{normal_symbols[i]}" for i in range( 0, n_normal_symbols            ) ]
196	iso_alphabet = [ f"{normal_symbols[i]}" for i in range( n_normal_symbols, n_normal_symbols*2  ) ]
197
198	random_machines = []
199
200	for i in range( n_modes ) :
201
202		m = random_machine( 
203			n_states=t_states_per_machine, 
204			symbols=alphabet, 
205			randomness=randomness,
206			connectedness=connectedness ) 
207		
208		m.collapse_to_largest_strongly_connected_subgraph()
209		m_iso = isomorphic_to( m, alphabet=iso_alphabet )
210
211		m.isoclass     = f"{i}"
212		m_iso.isoclass = f"{i}"
213
214		for j, state in enumerate( m.states ) : 
215			m.states[ j ].add_isomorph( m_iso.states[ j ].name )
216			m_iso.states[ j ].add_isomorph( m.states[ j ].name )
217
218		random_machines.append( m )
219		random_machines.append( m_iso )
220
221	mode_machine = star_join(  
222		exit_symbol=exit_symbol, 
223		enter_symbols=enter_symbols,
224		machines=random_machines,
225		mode_residency_factor=residency_factor
226	)
227
228	return mode_machine
229
230
231def isomorphic_to( 
232	m : HMM, 
233	alphabet : list[str],
234	decorator : str = '@' ) -> HMM :
235
236	# make sure there are enough symbols
237	if len( alphabet ) < len( m.alphabet ) :
238		raise ValueError( "Not enough symbols in the alphabet" )
239
240	# take the as much of them as needed
241	alphabet_used = alphabet[ 0 : len( m.alphabet ) ]
242
243	states = [ 
244		CausalState( 
245			name=f"{s.name}{decorator}",
246			classes=copy.deepcopy( s.classes )
247		)
248		for s in m.states
249	]
250
251	return HMM(
252		states=states,
253		transitions=copy.deepcopy( m.transitions ),
254		start_state=0,
255		alphabet=alphabet_used
256	)
257
258def random_machine( 
259	n_states : int, 
260	symbols  : list[str], 
261	connectedness : float,
262	randomness : float,
263	random_seed : int | None = None )  -> HMM  :
264
265	if random_seed is not None:
266		py_rng = random.Random(random_seed)
267		np_rng = np.random.default_rng(random_seed)
268	else:
269		py_rng = random
270		np_rng = resolve_rng(None)
271
272	states=[
273		CausalState( name=f"{i}" )
274		for i in range( n_states  )
275	] 
276
277	n_symbols = len( symbols )
278	transitions = []
279
280	for state_idx, state in enumerate( states ) : 
281
282		n_transitions = sum( py_rng.random() < connectedness for _ in range( n_symbols - 1 ) ) + 1
283		transition_to = py_rng.sample( range( n_states ), n_transitions )
284
285		transition_probabilities = exp_uniform_blend( 
286			n=n_transitions, 
287			alpha=randomness,
288			np_rng=np_rng )
289
290		transition_symbols_indices = py_rng.sample( range( n_symbols ), n_transitions )
291
292		for i, p in enumerate( transition_probabilities ) :
293			transitions.append(
294				Transition(
295					origin_state_idx=state_idx,
296					target_state_idx=transition_to[ i ],
297					prob=p,
298					symbol_idx=transition_symbols_indices[ i ]
299				)
300			) 
301
302	return HMM( 
303		states=states,
304		transitions=transitions,
305		start_state=0,
306		alphabet=symbols.copy()
307	)
def star_join( exit_symbol: str, enter_symbols: list[str], machines: list[amachine.am_hmm.HMM], mode_residency_factor: float) -> amachine.am_hmm.HMM:
 14def star_join(
 15	exit_symbol : str, 
 16	enter_symbols : list[str],
 17	machines : list[HMM],
 18	mode_residency_factor : float ) -> HMM :
 19
 20	machine = HMM()
 21
 22	isomorphic_groups = defaultdict(list)
 23	for i, m in enumerate( machines ) :
 24		if m.isoclass is not None :
 25			isomorphic_groups[ m.isoclass ].append( i )
 26
 27	# since we are merging multuple machines which might have name collision
 28	# we need to rename the states to ensure uniqueness
 29	def rename_state( base_name : str, g : int ) :
 30		return f"{g}/{base_name}"
 31
 32	def get_gid( idx : int, isoclass : int | None ) :
 33		return idx if isoclass is None else isoclass
 34
 35	machine.set_alphabet( [ exit_symbol ] )
 36
 37	for m in machines :
 38		machine.extend_alphabet( alphabet=m.alphabet )
 39
 40	# create a connector state and connector state class
 41
 42	connector_state = CausalState(
 43		name=f"/c", 
 44		classes=set({"connector"}) 
 45	)
 46	
 47	# initial states before adding each machines states
 48	machine.set_states( [ connector_state ] )
 49	machine.start_state = 0
 50
 51	# number of machines (groups of states)
 52	n_groups = len( machines )
 53
 54	# make sure we have enough symbols (otherwise connector can't be unifilar)
 55	if n_groups > len(enter_symbols) :
 56		raise Exception(
 57			f"Too few enter symbols given number of machines"
 58		)
 59
 60	# for each given machine
 61	for m_idx, m in enumerate( machines ) : 
 62
 63		# default to the index of the machine in the list
 64		m_gid = get_gid( m_idx, m.isoclass )
 65
 66		# give the states from this machine a class name
 67		m_classes = { 
 68			f"m_{m_idx}", 
 69			f"isoclass_{m.isoclass}" 
 70		}
 71
 72		added_states = []
 73		
 74		# create a state and extend our existing machine to include it
 75		for s_idx, state in enumerate( m.states ) :
 76
 77			isomorphs=set()
 78			if m.isoclass is not None and m.isoclass in isomorphic_groups :
 79
 80				for other_idx in isomorphic_groups[ m.isoclass ] :
 81					
 82					if other_idx == m_idx : 
 83						continue
 84					
 85					other_m = machines[ other_idx ]
 86					isomorphs.add(  
 87						rename_state( 
 88							other_m.states[ s_idx ].name, 
 89							get_gid( other_idx, other_m.isoclass ) )
 90					)
 91
 92			added_states.append( 
 93				CausalState( 
 94					name=rename_state(state.name, m_gid),
 95					classes=( m_classes | state.classes ),
 96					isomorphs=isomorphs
 97				) 
 98			)
 99
100		machine.extend_states( added_states )
101
102		added_transitions = []
103
104		# add all of the transitions from the machine
105		for tr in m.transitions :
106
107			# get the names of the states for the transition
108			origin_state_name = rename_state( m.states[ tr.origin_state_idx ].name, m_gid )
109			target_state_name = rename_state( m.states[ tr.target_state_idx ].name, m_gid )
110
111			# idx of the symbol remaped to this machines alphabet list
112			new_symbol_idx = machine.symbol_idx_map[ m.alphabet[ tr.symbol_idx ] ]
113
114			# create and add the new transition
115			added_transitions.append( Transition(
116				origin_state_idx=machine.state_idx_map[ origin_state_name ],
117				target_state_idx=machine.state_idx_map[ target_state_name ],
118				prob=tr.prob,
119				symbol_idx=new_symbol_idx
120			) )
121
122		machine.extend_transitions( added_transitions )
123
124		# Add connector transitions, and adjust transition probabilities to sum to 1
125
126		# the name of the state that is the entry point to this group from the connector
127		m_entry_state_name = rename_state( m.states[ m.start_state ].name, m_gid )
128
129		# get the index of the entry state for this machine
130		m_entry_state_idx = machine.state_idx_map[ m_entry_state_name ]
131
132
133		# Get the within group transitions from m's entry state
134		# ( the probabilities will need to be adjusted )
135		transition_ids_from_m_entry = set()
136		for i, tr in enumerate( machine.transitions ) : 
137			if tr.origin_state_idx == m_entry_state_idx :
138				transition_ids_from_m_entry.add( i )
139		
140		n_from_entry = len( transition_ids_from_m_entry )
141
142		# Pr of staying in this group is distributed over the within group outgoing edges from the entry state 
143		for i in transition_ids_from_m_entry :
144
145			machine.transitions[ i ] = Transition(
146				origin_state_idx=machine.transitions[ i ].origin_state_idx,
147				target_state_idx=machine.transitions[ i ].target_state_idx,
148				prob=mode_residency_factor / n_from_entry,
149				symbol_idx=machine.transitions[ i ].symbol_idx
150			)
151
152		# from m's entry state back to connector
153		escape_pr = 1.0 - mode_residency_factor
154
155		machine.extend_transitions( transitions=[
156			Transition(
157				origin_state_idx=m_entry_state_idx,
158				target_state_idx=machine.start_state,
159				prob=escape_pr,
160				symbol_idx=machine.symbol_idx_map[ exit_symbol ]
161			)
162		] )
163
164		# from the connector to m's entry state
165		machine.extend_alphabet( alphabet=[ enter_symbols[ m_idx ] ] )
166		
167		machine.extend_transitions( transitions=[
168			Transition(
169				origin_state_idx=machine.start_state,
170				target_state_idx=m_entry_state_idx,
171				prob=( 1.0 / n_groups ),
172				symbol_idx=machine.symbol_idx_map[ enter_symbols[ m_idx ] ]
173			)
174		] )
175
176	return machine
def star( exit_symbol: str, enter_symbols: list[str], normal_symbols: list[str], n_modes: int = 7, n_isomorphic: int = 2, randomness: float = 0.3, connectedness: float = 0.5, residency_factor: float = 0.5, n_normal_symbols: int = 4, t_states_per_machine: int = 17) -> amachine.am_hmm.HMM:
178def star(
179	exit_symbol          : str,
180	enter_symbols        : list[str],
181	normal_symbols       : list[str],
182	n_modes              : int = 7,
183	n_isomorphic         : int = 2,
184	randomness           : float = 0.3,
185	connectedness        : float = 0.5,
186	residency_factor     : float = 0.5,
187	n_normal_symbols     : int = 4,
188	t_states_per_machine : int = 17 )  -> HMM :
189
190	if len( normal_symbols ) < n_normal_symbols*n_isomorphic :
191		raise ValueError( "Must have at least n_normal_symbols*n_isomorphic normal symbols" )
192
193	if len( enter_symbols ) < n_modes*n_isomorphic :
194		raise ValueError( "Must have at least n_modes*n_isomorphic enter symbols" )
195
196	alphabet     = [ f"{normal_symbols[i]}" for i in range( 0, n_normal_symbols            ) ]
197	iso_alphabet = [ f"{normal_symbols[i]}" for i in range( n_normal_symbols, n_normal_symbols*2  ) ]
198
199	random_machines = []
200
201	for i in range( n_modes ) :
202
203		m = random_machine( 
204			n_states=t_states_per_machine, 
205			symbols=alphabet, 
206			randomness=randomness,
207			connectedness=connectedness ) 
208		
209		m.collapse_to_largest_strongly_connected_subgraph()
210		m_iso = isomorphic_to( m, alphabet=iso_alphabet )
211
212		m.isoclass     = f"{i}"
213		m_iso.isoclass = f"{i}"
214
215		for j, state in enumerate( m.states ) : 
216			m.states[ j ].add_isomorph( m_iso.states[ j ].name )
217			m_iso.states[ j ].add_isomorph( m.states[ j ].name )
218
219		random_machines.append( m )
220		random_machines.append( m_iso )
221
222	mode_machine = star_join(  
223		exit_symbol=exit_symbol, 
224		enter_symbols=enter_symbols,
225		machines=random_machines,
226		mode_residency_factor=residency_factor
227	)
228
229	return mode_machine
def isomorphic_to( m: amachine.am_hmm.HMM, alphabet: list[str], decorator: str = '@') -> amachine.am_hmm.HMM:
232def isomorphic_to( 
233	m : HMM, 
234	alphabet : list[str],
235	decorator : str = '@' ) -> HMM :
236
237	# make sure there are enough symbols
238	if len( alphabet ) < len( m.alphabet ) :
239		raise ValueError( "Not enough symbols in the alphabet" )
240
241	# take the as much of them as needed
242	alphabet_used = alphabet[ 0 : len( m.alphabet ) ]
243
244	states = [ 
245		CausalState( 
246			name=f"{s.name}{decorator}",
247			classes=copy.deepcopy( s.classes )
248		)
249		for s in m.states
250	]
251
252	return HMM(
253		states=states,
254		transitions=copy.deepcopy( m.transitions ),
255		start_state=0,
256		alphabet=alphabet_used
257	)
def random_machine( n_states: int, symbols: list[str], connectedness: float, randomness: float, random_seed: int | None = None) -> amachine.am_hmm.HMM:
259def random_machine( 
260	n_states : int, 
261	symbols  : list[str], 
262	connectedness : float,
263	randomness : float,
264	random_seed : int | None = None )  -> HMM  :
265
266	if random_seed is not None:
267		py_rng = random.Random(random_seed)
268		np_rng = np.random.default_rng(random_seed)
269	else:
270		py_rng = random
271		np_rng = resolve_rng(None)
272
273	states=[
274		CausalState( name=f"{i}" )
275		for i in range( n_states  )
276	] 
277
278	n_symbols = len( symbols )
279	transitions = []
280
281	for state_idx, state in enumerate( states ) : 
282
283		n_transitions = sum( py_rng.random() < connectedness for _ in range( n_symbols - 1 ) ) + 1
284		transition_to = py_rng.sample( range( n_states ), n_transitions )
285
286		transition_probabilities = exp_uniform_blend( 
287			n=n_transitions, 
288			alpha=randomness,
289			np_rng=np_rng )
290
291		transition_symbols_indices = py_rng.sample( range( n_symbols ), n_transitions )
292
293		for i, p in enumerate( transition_probabilities ) :
294			transitions.append(
295				Transition(
296					origin_state_idx=state_idx,
297					target_state_idx=transition_to[ i ],
298					prob=p,
299					symbol_idx=transition_symbols_indices[ i ]
300				)
301			) 
302
303	return HMM( 
304		states=states,
305		transitions=transitions,
306		start_state=0,
307		alphabet=symbols.copy()
308	)