kernel.sdk.SdkClient Documentation

SdkClient Class

 
class SdkClient(builtins.object)
    SdkClient(autostart: bool = True)
 

 
  Methods defined here:
__del__(self)
Stop the telemetry socket thread when the SdkClient is deleted.
__init__(self, autostart: bool = True)
Initialize self.  See help(type(self)) for accurate signature.
current_flow_usb_state(self) -> str
Get the current state of the flow device.
get_available_modules(self) -> List[int]
Get the available modules on the connected Flow device.
 
Use this method to determine which modules are available for subscription when calling on_moments_by_module.
 
Returns:
    List[int]: A list of integers representing the available modules.
 
Raises:
    ValueError: If the available modules cannot be deserialized from the metadata. This may indicate a version mismatch between the SDK and kortex.
    TimeoutError: If the available modules cannot be received within 1 second. This may indicate the flow device is not connected.
    FlowDisconnected: If the Flow device is not connected.
    FlowNotBooted: If the Flow device is not fully booted.
    FlowFaulted: If the Flow device is in a faulted state.
get_flow_system_state(self) -> str
Get the current system state of the flow device.
initialize_flow(self, study_id: str, api_key: str)
Initialize the flow device. This should be done once after powering on the device.
on_moments(self, moment_id: kernel.sdk.socket.MomentNumber, wavelength: kernel.sdk.socket.Wavelength, callback: Callable[[numpy.ndarray, numpy.ndarray], Any], once: bool = False) -> kernel.sdk.socket.EventFuture
Register a callback handler for moments data.
 
The callback will received a tuple of (timestamp, data) where timestamps ia a 1-D numpy array of timestamps of shape (t) and data is a 5-D numpy array of the moments data with dimensions (time, source_module, source_id, detector_module, detector_id) of shape (t, 48, 3, 48, 6).
Typically you can expect the first invocation to have a shape of (t, 48, 3, 48, 6) where t is the number of moments data available at the time of subscription.
Subsequent invocations will typically have a shape of (1, 48, 3, 48, 6).
 
the moments array data is of type np.float32, where -np.inf indicates no data available for that channel.
 
The callback will be called whenever a new laser pattern is finished, approximately every 200ms. if the previous callback invocation is still running, the next invocation will be skipped.
To prevent data loss, we recommend offloading heavy compute tasks to a separate thread or process, and writing the callback to emit the data to a queue.
 
Arguments:
    moment_id: Moment ID to subscribe to
    wavelength: Wavelength to subscribe to
    callback: Function to call when new moments data is available
    once: Subscribe only once and then unsubscribe
 
Returns:
    EventFuture: An object that can be used to cancel the subscription
on_moments_by_module(self, moment_id: kernel.sdk.socket.MomentNumber, wavelength: kernel.sdk.socket.Wavelength, module: int, callback: Callable[[numpy.ndarray, numpy.ndarray], Any], once: bool = False) -> kernel.sdk.socket.EventFuture
Register a callback handler for moments data for detectors on a single module, averaged across within-module sources.
 
The callback will received a tuple of (timestamp, data) where timestamps ia a 1-D numpy array of timestamps of shape (t) and data is a 2-D numpy array of the moments data with dimensions (time, detector_id) of shape (t, 6).
Typically you can expect the first invocation to have a shape of (t, 6) where t is the number of moments data available at the time of subscription.
Subsequent invocations will typically have a shape of (1, 6).
 
the moments array data is of type np.float32, where 0.0 indicates no data available for that detector.
If all detectors have 0.0, the module is not connected or is experiencing issues.
Confirm the module is connected, and if you are still experiencing issues, contact support.
on_retained_channels(self, callback: Callable[[numpy.ndarray, numpy.ndarray], Any]) -> kernel.sdk.socket.EventFuture
Register a callback handler for retained channels data.
 
The callback will received a tuple of (timestamp, data) where timestamps ia a 1-D numpy array of timestamps of shape (t) and data is a 2-D numpy array of the retained channels data with dimensions (time, module_id) of shape (t, MAX_MODULES).
Typically you can expect the first invocation to have a shape of (t, MAX_MODULES) where t is the number of retained channels data available at the time of subscription.
Subsequent invocations will typically have a shape of (1, MAX_MODULES).
 
the retained channels array data is of type np.float32, where -np.inf indicates no data available for that channel.
start(self, block: bool = True, timeout: float = 10)
Start the telemetry socket thread, optionally blocking until the socket is connected. Subscribes to metadata for state checks on commands
 
This method is called automatically when the SdkClient is created with autostart=True.
stop(self)
Stop the telemetry socket thread.
 
The SdkClient is not usable after this method is called. To use the SdkClient again, create a new instance.
tune(self)
Tune the flow device. This should be done after initializing the device.
turn_lasers_off(self)
Turn off the lasers on the flow device.
turn_lasers_on(self)
Turn on the lasers on the flow device.

Readonly properties defined here:
flow_system_state
usb_state

Data descriptors defined here:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Data and other attributes defined here:
DANUBE_SOURCE_METADATA = 'urn:kernel.com/module/metadata/danube_source_metadata'
FLOW_SYSTEM_STATE_METADATA = 'urn:kernel.com/module/metadata/danube_system_state'
KORTEX_CONFIG_PATH = '/etc/kernel.com/kortex.json'
MAX_MODULES = 48
MOMENTS_FIELD_URN_TEMPLATE = 'urn:kernel.com/field/flow/moments_{moment_id}_{wavelength}/moment'
MOMENTS_MODULE_URN_TEMPLATE = 'urn:kernel.com/module/flow/moments_{moment_id}_{wavelength}'
RETAINED_CHANNELS_FIELD_URN = 'urn:kernel.com/field/flow/retained_channels/retained_channels_percentage'
RETAINED_CHANNELS_MODULE_URN = 'urn:kernel.com/module/flow/retained_channels'
USB_STATE_METADATA = 'urn:kernel.com/module/metadata/usb_status_danube'

run_workflow Function

run_workflow(study_id: str, api_key: str, skip_initialize: bool = False, skip_coupling_check: bool = False, high_priority_modules: Optional[List[int]] = None)
Run an example workflow to record a dataset with the Flow device.
 
This workflow will guide a user through the process of initializing the Flow device, tuning it, and starting data gathering.
It will also check the coupling of the headset and prompt the user to adjust the placement if necessary.
Parameters:
    study_id (str): The study ID for the Flow device.
    api_key (str): The API key for the Flow device.
    skip_initialize (bool): If True, skip the initialization step. Default is False.
    skip_coupling_check (bool): If True, skip the coupling check step. Default is False.
    high_priority_modules (Optional[List[int]]): A list of module IDs to check for high priority coupling. Default is None.
Raises:
    ValueError: If the Flow device is not connected or if the USB state is not valid.
    RuntimeError: If the user chooses to abort the workflow.
    TimeoutError: If the Flow device does not respond within the specified timeout.
 
Note:
    This workflow is provided as an example and may need to be modified for your specific use case.