Metadata-Version: 2.4
Name: robogpt_client
Version: 0.1.2
Summary: RoboGPT gRPC Client for agent communication
Home-page: https://github.com/orangewood-co/robogpt
Author: Orangewood Labs
Author-email: support@orangewood.co
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: grpcio>=1.50.0
Requires-Dist: grpcio-tools>=1.50.0
Requires-Dist: protobuf>=4.21.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: black>=22.0; extra == "dev"
Requires-Dist: flake8>=4.0; extra == "dev"
Requires-Dist: mypy>=0.990; extra == "dev"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# RoboGPT Client

`robogpt_client` is a Python package that talks to RoboGPT over gRPC.  
It provides three main clients:

- **Agent client** – chat/streaming, function calls, tool discovery, agent config
- **Vision client** – camera management, detection result streaming, object pose queries
- **Robot control client** – load robots, move, read state, I/O, save poses

---

## Installation

### From source (development)

```bash
git clone git@github.com:orangewood-co/robogpt_client.git
cd robogpt_client
pip install -e .
```

### From PyPI (when published) 

```bash
pip3 install robogpt_client
```

---

## Agent Client

Class: `robogpt_client.agents.agent_client.AgentClient`

Constructor:

```python
AgentClient(server_address: str = "localhost", port: int = 50051)
```

### Methods

#### Streaming / Chat

- `send_response(message: str, sender: str = "Client") -> None`  
  Unidirectional: send one (or a generator of) response message(s) to the server (`ReadResponse` RPC).

- `read_prompt(message_callback: Optional[Callable[[str], None]] = None) -> None`  
  Unidirectional: listen to prompts from the server (`StreamPrompts` RPC).  
  Calls `message_callback(content: str)` for the first received message, then stops.

- `stream_chat(sender: str = "Client", message_callback: Optional[Callable[[str], None]] = None) -> None`  
  Bidirectional: start a chat stream (`Prompts` RPC).  
  - Client sends messages generated from `self.callback_output`.
  - For each server message: prints it and calls `message_callback(content: str)` if provided.
  - Whatever `message_callback` returns is stored back into `self.callback_output`.

#### Function Execution

- `call_function(function_name: str, arguments: Optional[Dict[str, Any]] = None, verbose: bool = True) -> Dict[str, Any]`  
  Calls `CallFunction` on the server.  
  Returns dict:

  ```python
  {
      "success": bool,
      "output": str,
      "error": str,
  }
  ```

#### Tool Discovery

- `get_tools(verbose: bool = True) -> Optional[Dict[str, Any]]`  
  Calls `GetTools` RPC, parses `tools_metadata_json` and returns a dict mapping tool name to metadata.

- `save_tools_to_file(filepath: str = "tools_metadata.json") -> bool`  
  Fetches tools with `get_tools()` and writes them to JSON.

- `list_tools() -> Optional[list]`  
  Returns a list of tool names (keys of `get_tools()`), or `None` on error.

- `get_tool_info(tool_name: str) -> Optional[Dict[str, Any]]`  
  Returns the metadata dict for a single tool name, or `None` if missing/error.

#### Agent Configuration

- `upload_config(config_data: str, verbose: bool = True) -> bool`  
  Uploads configuration text via `UploadAgentConfig` RPC.

- `upload_config_from_file(filepath: str) -> bool`  
  Reads the given file and calls `upload_config`.

#### Connection Management

- `close() -> None`  
  Stop streaming and close the gRPC channel.

- Context manager support:

  ```python
  def __enter__(self) -> "AgentClient"
  def __exit__(self, exc_type, exc_val, exc_tb) -> None
  ```

### Simple example

```python
from robogpt_client.agents.agent_client import AgentClient

with AgentClient() as client:
    result = client.call_function("get_joint", {"robot_to_use": 1})
    print(result)
```

---

## Vision Client

Class: `robogpt_client.vision.vision_client.VisionClient`

Constructor:

```python
VisionClient(server_address: str = "localhost", port: int = 50052)
```

### Methods

#### Camera Management

- `get_camera_list(verbose: bool = True) -> Optional[List[Dict[str, str]]]`  
  Calls `CameraList` RPC.  
  On success returns a list of dicts:

  ```python
  {
      "camera_name": str,
      "camera_type": str,
      "serial_number": str,
  }
  ```

- `initialize_camera(camera_name: str, camera_type: str = "RGB", serial_number: str = "", verbose: bool = True) -> Optional[Dict[str, Any]]`  
  Calls `IntializeCamera` RPC.  
  Returns:

  ```python
  {
      "success": bool,
      "message": str,
      "stream_link": str,
  }
  ```

#### Detection Results Streaming

- `send_detection_results(detections: Iterable[Dict[str, Any]], camera_id: str = "default_camera", verbose: bool = True) -> bool`  
  Client-streaming RPC: `DetectionResults`.  
  Each `detection` dict is packed into a `Struct` and streamed.  
  Returns `True` on successful RPC completion, `False` otherwise.

#### Object Pose Fetching

- `fetch_object_pose(object_name: str, frame_name: str = "base_link", camera_id: Optional[str] = None, verbose: bool = True) -> Optional[Dict[str, Any]]`  
  Calls `FetchObjectPose` RPC.  
  On success returns:

  ```python
  {
      "success": bool,
      "message": str,
      "pose": {
          "position": List[float],
          "orientation": List[float],
      } or None,
      "additional_info": Dict[str, Any] or None,
  }
  ```

#### Camera Control

- `stop_camera(camera_name: str, verbose: bool = True) -> Optional[Dict[str, Any]]`  
  Calls `StopCamera` RPC.  
  Returns:

  ```python
  {
      "success": bool,
      "message": str,
  }
  ```

- `stop_all_cameras(verbose: bool = True) -> Optional[Dict[str, Any]]`  
  Calls `StopAllCameras` RPC with `KillAllCameras` request.  
  Returns:

  ```python
  {
      "success": bool,
      "message": str,
  }
  ```

- `get_camera_streams(verbose: bool = True) -> Optional[List[Dict[str, Any]]]`  
  Calls `GetCameraStreams` RPC.  
  On success returns list of dicts:

  ```python
  {
      "camera_name": str,
      "stream_link": str,
      "is_streaming": bool,
  }
  ```

#### Helper

- `save_camera_list(filepath: str = "cameras.json") -> bool`  
  Calls `get_camera_list(verbose=False)` and saves the result to JSON.

#### Connection Management

- `close() -> None` – close gRPC channel.  
- `__enter__`, `__exit__` – context manager support.

### Simple example

```python
from robogpt_client.vision.vision_client import VisionClient

with VisionClient() as vc:
    cams = vc.get_camera_list()
    print(cams)
```

---

## Robot Control Client

Class: `robogpt_client.robot_control.bot_control_client.BotControlClient`

Constructor:

```python
BotControlClient(server_address: str = "localhost", port: int = 50052)
```

> Note: The default `port` here is `50052` in the class, though the comment says `50051`.

### Methods

#### Robot Lifecycle

- `load_robot(robot_names: List[str], robot_ip: List[str], use_simulation: bool = False, robot_prefix: Optional[List[str]] = None) -> Tuple[bool, str]`  
  Calls `LoadRobot` RPC.  
  Returns `(success, message)`.

#### Motion

- `move_to_pose(robot_id: int, pose_name: str, speed: float = 0.5, acceleration: float = 0.5, user_frame: int = 0) -> Tuple[bool, str]`  
  Calls `MoveToPose` RPC to move to a named pose.  
  Returns `(success, message)`.

- `move_to_joint(robot_id: int, joint_angles: List[float], speed: float = 0.5, acceleration: float = 0.5) -> Tuple[bool, str, float]`  
  Calls `MoveToJoint` RPC for a joint-space move.  
  Returns `(success, message, execution_time)`.

#### State

- `get_current_tcp(robot_id: int, user_frame: int = 0, tool_number: int = 0) -> Tuple[bool, str, Optional[List[float]]]`  
  Calls `GetCurrentTcp` RPC.  
  On success, last element is TCP pose list; otherwise `None`.

- `get_current_joints(robot_id: int) -> Tuple[bool, str, Optional[List[float]]]`  
  Calls `GetCurrentJoints` RPC.  
  On success, last element is current joint angles; otherwise `None`.

#### I/O

- `set_digital_pin(robot_id: int, pin_number: int, value: bool) -> Tuple[bool, str]`  
  Calls `SetDigitalPin` RPC.  
  Returns `(success, message)`.

- `get_digital_pin(robot_id: int, pin_number: int) -> Tuple[bool, str, bool]`  
  Calls `GetDigitalPin` RPC.  
  Returns `(success, message, value)`.

#### Poses

- `save_pose(robot_id: int, pose_name: str, pose_type: str = "tcp", user_frame: int = 0, tool_number: int = 0) -> Tuple[bool, str]`  
  Calls `SavePose` RPC to store the current pose with a name.  
  Returns `(success, message)`.

#### Connection Management

- `close() -> None`  
  Close gRPC channel.

- `__enter__`, `__exit__` – context manager support.

### Simple example

```python
from robogpt_client.robot_control.bot_control_client import BotControlClient

with BotControlClient() as client:
    ok, msg = client.load_robot(
        robot_names=["robot1"],
        robot_ip=["192.168.1.100"],
        use_simulation=True,
    )
    print("Load:", ok, msg)

    ok, msg, joints = client.get_current_joints(robot_id=1)
    print("Joints:", ok, joints)
```

---

## Examples

```bash
python3 -m robogpt_client.examples.demo_agents
python3 -m robogpt_client.examples.demo_vision
python3 -m robogpt_client.examples.demo_bot_control
```
---

## Support

For issues and questions, open a GitHub issue or contact support@orangewood.co
