Coverage for src / kiss / agents / sorcar / sorcar.py: 76%

814 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 22:57 -0700

1"""Browser-based chatbot for RelentlessAgent-based agents.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import base64 

7import hashlib 

8import json 

9import logging 

10import os 

11import queue 

12import shutil 

13import socket 

14import subprocess 

15import sys 

16import threading 

17import time 

18import types 

19import webbrowser 

20from collections.abc import AsyncGenerator, Callable 

21from pathlib import Path 

22from typing import Any 

23 

24from kiss.agents.sorcar.browser_ui import BaseBrowserPrinter, find_free_port 

25from kiss.agents.sorcar.chatbot_ui import _THEME_PRESETS, _build_html 

26from kiss.agents.sorcar.code_server import ( 

27 _capture_untracked, 

28 _cleanup_merge_data, 

29 _parse_diff_hunks, 

30 _prepare_merge_view, 

31 _restore_merge_files, 

32 _save_untracked_base, 

33 _scan_files, 

34 _setup_code_server, 

35 _snapshot_files, 

36) 

37from kiss.agents.sorcar.task_history import ( 

38 _KISS_DIR, 

39 SAMPLE_TASKS, 

40 _add_task, 

41 _append_task_to_md, 

42 _init_task_history_md, 

43 _load_file_usage, 

44 _load_history, 

45 _load_last_model, 

46 _load_model_usage, 

47 _load_proposals, 

48 _record_file_usage, 

49 _record_model_usage, 

50 _save_proposals, 

51 _set_latest_chat_events, 

52) 

53from kiss.core import config as config_module 

54from kiss.core.kiss_agent import KISSAgent 

55from kiss.core.models.model_info import ( 

56 _OPENAI_PREFIXES, 

57 MODEL_INFO, 

58 get_available_models, 

59) 

60from kiss.core.relentless_agent import RelentlessAgent 

61 

62logger = logging.getLogger(__name__) 

63 

64_FAST_MODEL = "gemini-2.0-flash" 

65_COMMIT_MODEL = "gemini-2.0-flash" 

66_INTERNAL_MODELS = frozenset({_FAST_MODEL, _COMMIT_MODEL}) 

67 

68 

69class _StopRequested(BaseException): 

70 pass 

71 

72 

73def _read_active_file(cs_data_dir: str) -> str: 

74 try: 

75 af_path = os.path.join(cs_data_dir, "active-file.json") 

76 with open(af_path) as af: 

77 path: str = json.loads(af.read()).get("path", "") 

78 if path and os.path.isfile(path): 

79 return path 

80 except (OSError, json.JSONDecodeError): 

81 logger.debug("Exception caught", exc_info=True) 

82 return "" 

83 

84 

85def _clean_llm_output(text: str) -> str: 

86 return text.strip().strip('"').strip("'") 

87 

88 

89def _generate_commit_msg(diff_text: str, *, detailed: bool = False) -> str: 

90 if detailed: 

91 prompt = ( 

92 "Generate a nicely markdown formatted, informative git commit message for " 

93 "these changes. Use conventional commit format with a clear subject " 

94 "line (type: description) and optionally a body with bullet points " 

95 "for multiple changes. Return ONLY the commit message text, no " 

96 "quotes or markdown fences.\n\n{context}" 

97 ) 

98 else: 

99 prompt = ( 

100 "Generate a concise git commit message (1-2 lines) for these changes. " 

101 "Return ONLY the commit message text, no quotes.\n\n{context}" 

102 ) 

103 agent = KISSAgent("Commit Message Generator") 

104 try: 

105 raw = agent.run( 

106 model_name=_COMMIT_MODEL, 

107 prompt_template=prompt, 

108 arguments={"context": diff_text}, 

109 is_agentic=False, 

110 ) 

111 return _clean_llm_output(raw) 

112 except Exception: 

113 logger.debug("Exception caught", exc_info=True) 

114 return "" 

115 

116 

117def _model_vendor_order(name: str) -> int: 

118 if name.startswith("claude-"): 

119 return 0 

120 if name.startswith(_OPENAI_PREFIXES): 

121 return 1 

122 if name.startswith("gemini-"): 

123 return 2 

124 if name.startswith("minimax-"): 

125 return 3 

126 if name.startswith("openrouter/"): 

127 return 4 

128 return 5 

129 

130 

131def run_chatbot( 

132 agent_factory: Callable[[str], RelentlessAgent], 

133 title: str = "KISS Sorcar", 

134 work_dir: str | None = None, 

135 default_model: str = "claude-opus-4-6", 

136 agent_kwargs: dict[str, Any] | None = None, 

137) -> None: 

138 """Run a browser-based chatbot UI for any RelentlessAgent-based agent. 

139 

140 Args: 

141 agent_factory: Callable that takes a name string and returns a RelentlessAgent instance. 

142 title: Title displayed in the browser tab. 

143 work_dir: Working directory for the agent. Defaults to current directory. 

144 default_model: Default LLM model name for the model selector. 

145 agent_kwargs: Additional keyword arguments passed to agent.run(). 

146 """ 

147 import uvicorn 

148 from starlette.applications import Starlette 

149 from starlette.requests import Request 

150 from starlette.responses import HTMLResponse, JSONResponse, StreamingResponse 

151 from starlette.routing import Route 

152 

153 printer = BaseBrowserPrinter() 

154 running = False 

155 running_lock = threading.Lock() 

156 shutting_down = threading.Event() 

157 merging = False 

158 actual_work_dir = work_dir or os.getcwd() 

159 file_cache: list[str] = _scan_files(actual_work_dir) 

160 agent_thread: threading.Thread | None = None 

161 current_stop_event: threading.Event | None = None 

162 proposed_tasks: list[str] = _load_proposals() 

163 proposed_lock = threading.Lock() 

164 selected_model = _load_last_model() or default_model 

165 last = _load_last_model() 

166 selected_model = last if last and last not in _INTERNAL_MODELS else default_model 

167 

168 _init_task_history_md() 

169 

170 cs_proc: subprocess.Popen[bytes] | None = None 

171 code_server_url = "" 

172 wd_hash = hashlib.md5(actual_work_dir.encode()).hexdigest()[:8] 

173 cs_data_dir = str(_KISS_DIR / f"cs-{wd_hash}") 

174 

175 # If another sorcar instance is already running with this data dir, 

176 # use a unique data dir for this instance to avoid collisions 

177 # (e.g., assistant-port overwrite, shared IPC files). 

178 _existing_port_file = Path(cs_data_dir) / "assistant-port" 

179 if _existing_port_file.exists(): 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 try: 

181 _existing_port = int(_existing_port_file.read_text().strip()) 

182 with socket.create_connection( 

183 ("127.0.0.1", _existing_port), timeout=0.5 

184 ): 

185 # Another instance is reachable; isolate this instance. 

186 cs_data_dir = str( 

187 _KISS_DIR / f"cs-{wd_hash}-{os.getpid()}" 

188 ) 

189 except (ConnectionRefusedError, OSError, ValueError): 

190 pass # Port not reachable; safe to reuse this data dir. 

191 

192 # Restore files from any stale merge state (e.g., previous crash during merge) 

193 _restore_merge_files(cs_data_dir, actual_work_dir) 

194 

195 # Read or assign a code-server port for this work directory. 

196 # Use socket.bind directly (not find_free_port) so test patches 

197 # that override find_free_port for the Starlette port don't collide. 

198 cs_port_file = Path(cs_data_dir) / "cs-port" 

199 cs_port_file.parent.mkdir(parents=True, exist_ok=True) 

200 cs_port = 0 

201 if cs_port_file.exists(): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 try: 

203 cs_port = int(cs_port_file.read_text().strip()) 

204 except (ValueError, OSError): 

205 logger.debug("Exception caught", exc_info=True) 

206 if not cs_port: 206 ↛ 214line 206 didn't jump to line 214 because the condition on line 206 was always true

207 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as _s: 

208 _s.bind(("", 0)) 

209 cs_port = int(_s.getsockname()[1]) 

210 try: 

211 cs_port_file.write_text(str(cs_port)) 

212 except OSError: 

213 logger.debug("Exception caught", exc_info=True) 

214 cs_url = f"http://127.0.0.1:{cs_port}" 

215 cs_binary = shutil.which("code-server") 

216 

217 def _code_server_launch_args() -> list[str]: 

218 assert cs_binary is not None 

219 return [ 

220 cs_binary, 

221 "--port", 

222 str(cs_port), 

223 "--auth", 

224 "none", 

225 "--bind-addr", 

226 f"127.0.0.1:{cs_port}", 

227 "--disable-telemetry", 

228 "--user-data-dir", 

229 cs_data_dir, 

230 "--extensions-dir", 

231 str(Path(cs_data_dir) / "extensions"), 

232 "--disable-getting-started-override", 

233 "--disable-workspace-trust", 

234 actual_work_dir, 

235 ] 

236 

237 def _watch_code_server() -> None: 

238 """Monitor code-server subprocess and restart it if it crashes.""" 

239 nonlocal cs_proc, code_server_url 

240 while not shutting_down.is_set(): 

241 shutting_down.wait(5.0) 

242 if shutting_down.is_set(): 

243 break 

244 if cs_proc is None: 

245 continue 

246 ret = cs_proc.poll() 

247 if ret is None: 

248 continue 

249 logger.warning( 

250 "code-server exited with code %d, restarting...", ret 

251 ) 

252 try: 

253 from kiss.agents.sorcar.code_server import _MS_GALLERY 

254 

255 cs_env = {**os.environ, "EXTENSIONS_GALLERY": _MS_GALLERY} 

256 cs_proc = subprocess.Popen( 

257 _code_server_launch_args(), 

258 stdout=subprocess.DEVNULL, 

259 stderr=subprocess.DEVNULL, 

260 env=cs_env, 

261 start_new_session=True, 

262 ) 

263 restarted = False 

264 for _ in range(30): 

265 try: 

266 with socket.create_connection( 

267 ("127.0.0.1", cs_port), timeout=0.5 

268 ): 

269 code_server_url = cs_url 

270 restarted = True 

271 break 

272 except (ConnectionRefusedError, OSError): 

273 logger.debug("Exception caught", exc_info=True) 

274 time.sleep(0.5) 

275 if restarted: 

276 logger.info("code-server restarted at %s", code_server_url) 

277 printer.broadcast({"type": "code_server_restarted"}) 

278 else: 

279 logger.warning("code-server failed to restart") 

280 except Exception: 

281 logger.debug("Exception caught", exc_info=True) 

282 if cs_binary: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 ext_changed = _setup_code_server(cs_data_dir) 

284 port_in_use = False 

285 try: 

286 with socket.create_connection(("127.0.0.1", cs_port), timeout=0.5): 

287 port_in_use = True 

288 except (ConnectionRefusedError, OSError): 

289 logger.debug("Exception caught", exc_info=True) 

290 # If our stored port is not in use, verify it's still bindable; 

291 # if another process grabbed it, pick a fresh port. 

292 if not port_in_use: 

293 try: 

294 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as _s: 

295 _s.bind(("127.0.0.1", cs_port)) 

296 except OSError: 

297 cs_port = find_free_port() 

298 cs_url = f"http://127.0.0.1:{cs_port}" 

299 try: 

300 cs_port_file.write_text(str(cs_port)) 

301 except OSError: 

302 logger.debug("Exception caught", exc_info=True) 

303 

304 workdir_file = Path(cs_data_dir) / "workdir" 

305 prev_workdir = "" 

306 try: 

307 prev_workdir = workdir_file.read_text().strip() if workdir_file.exists() else "" 

308 except OSError: 

309 logger.debug("Exception caught", exc_info=True) 

310 workdir_changed = prev_workdir != actual_work_dir 

311 

312 need_restart = port_in_use and (ext_changed or workdir_changed) 

313 if need_restart: 

314 reason = "extension updated" if ext_changed else "work directory changed" 

315 printer.print(f"Restarting code-server ({reason})...") 

316 try: 

317 result = subprocess.run( 

318 ["lsof", "-ti", f":{cs_port}", "-sTCP:LISTEN"], 

319 capture_output=True, 

320 text=True, 

321 ) 

322 for pid_str in result.stdout.strip().split("\n"): 

323 if pid_str.strip(): 

324 os.kill(int(pid_str.strip()), 15) 

325 time.sleep(1.5) 

326 except Exception: 

327 logger.debug("Exception caught", exc_info=True) 

328 port_in_use = False 

329 if port_in_use: 

330 code_server_url = cs_url 

331 printer.print(f"Reusing existing code-server at {code_server_url}") 

332 else: 

333 from kiss.agents.sorcar.code_server import _MS_GALLERY 

334 

335 cs_env = {**os.environ, "EXTENSIONS_GALLERY": _MS_GALLERY} 

336 cs_proc = subprocess.Popen( 

337 [ 

338 cs_binary, 

339 "--port", 

340 str(cs_port), 

341 "--auth", 

342 "none", 

343 "--bind-addr", 

344 f"127.0.0.1:{cs_port}", 

345 "--disable-telemetry", 

346 "--user-data-dir", 

347 cs_data_dir, 

348 "--extensions-dir", 

349 str(Path(cs_data_dir) / "extensions"), 

350 "--disable-getting-started-override", 

351 "--disable-workspace-trust", 

352 actual_work_dir, 

353 ], 

354 stdout=subprocess.DEVNULL, 

355 stderr=subprocess.DEVNULL, 

356 env=cs_env, 

357 start_new_session=True, 

358 ) 

359 for _ in range(30): 

360 try: 

361 with socket.create_connection(("127.0.0.1", cs_port), timeout=0.5): 

362 code_server_url = cs_url 

363 break 

364 except (ConnectionRefusedError, OSError): 

365 logger.debug("Exception caught", exc_info=True) 

366 time.sleep(0.5) 

367 if code_server_url: 

368 printer.print(f"code-server running at {code_server_url}") 

369 else: 

370 printer.print("Warning: code-server failed to start") 

371 if code_server_url: 

372 try: 

373 workdir_file.write_text(actual_work_dir) 

374 except OSError: 

375 logger.debug("Exception caught", exc_info=True) 

376 

377 if cs_binary and code_server_url: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true

378 threading.Thread(target=_watch_code_server, daemon=True).start() 

379 

380 html_page = _build_html(title, code_server_url, actual_work_dir) 

381 shutdown_timer: threading.Timer | None = None 

382 shutdown_lock = threading.Lock() 

383 

384 def refresh_file_cache() -> None: 

385 nonlocal file_cache 

386 file_cache = _scan_files(actual_work_dir) 

387 

388 def refresh_proposed_tasks() -> None: 

389 nonlocal proposed_tasks 

390 history = _load_history() 

391 if not history: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 with proposed_lock: 

393 proposed_tasks = [] 

394 printer.broadcast({"type": "proposed_updated"}) 

395 return 

396 task_list = "\n".join(f"- {e['task']}" for e in history[:20]) 

397 agent = KISSAgent("Task Proposer") 

398 try: 

399 result = agent.run( 

400 model_name=_FAST_MODEL, 

401 prompt_template=( 

402 "Based on these past tasks a developer has worked on, suggest 5 new " 

403 "tasks they might want to do next. Tasks should be natural follow-ups, " 

404 "related improvements, or complementary work.\n\n" 

405 "Past tasks:\n{task_list}\n\n" 

406 "Return ONLY a JSON array of 5 short task description strings. " 

407 'Example: ["Add unit tests for X", "Refactor Y module"]' 

408 ), 

409 arguments={"task_list": task_list}, 

410 is_agentic=False, 

411 ) 

412 start = result.index("[") 

413 end = result.index("]", start) + 1 

414 proposals = json.loads(result[start:end]) 

415 proposals = [str(p) for p in proposals if isinstance(p, str) and p.strip()][:5] 

416 except Exception: 

417 logger.debug("Exception caught", exc_info=True) 

418 proposals = [] 

419 with proposed_lock: 

420 proposed_tasks = proposals 

421 _save_proposals(proposals) 

422 printer.broadcast({"type": "proposed_updated"}) 

423 

424 def generate_followup(task: str, result: str) -> None: 

425 try: 

426 agent = KISSAgent("Followup Proposer") 

427 raw = agent.run( 

428 model_name=_FAST_MODEL, 

429 prompt_template=( 

430 "A developer just completed this task:\n" 

431 "Task: {task}\n" 

432 "Result summary: {result}\n\n" 

433 "Suggest ONE short, concrete follow-up task they " 

434 "might want to do next. Return ONLY the task " 

435 "description as a single plain-text sentence." 

436 ), 

437 arguments={ 

438 "task": task, 

439 "result": result[:500], 

440 }, 

441 is_agentic=False, 

442 ) 

443 suggestion = _clean_llm_output(raw) 

444 if suggestion: 444 ↛ exitline 444 didn't return from function 'generate_followup' because the condition on line 444 was always true

445 printer.broadcast( 

446 { 

447 "type": "followup_suggestion", 

448 "text": suggestion, 

449 } 

450 ) 

451 except Exception: 

452 logger.debug("Exception caught", exc_info=True) 

453 

454 def _watch_theme_file() -> None: 

455 theme_file = _KISS_DIR / "vscode-theme.json" 

456 last_mtime = 0.0 

457 try: 

458 if theme_file.exists(): 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true

459 last_mtime = theme_file.stat().st_mtime 

460 except OSError: 

461 logger.debug("Exception caught", exc_info=True) 

462 while not shutting_down.is_set(): 

463 try: 

464 if theme_file.exists(): 

465 mtime = theme_file.stat().st_mtime 

466 if mtime > last_mtime: 

467 last_mtime = mtime 

468 data = json.loads(theme_file.read_text()) 

469 kind = data.get("kind", "dark") 

470 colors = _THEME_PRESETS.get(kind, _THEME_PRESETS["dark"]) 

471 printer.broadcast({"type": "theme_changed", **colors}) 

472 except (OSError, json.JSONDecodeError): 

473 logger.debug("Exception caught", exc_info=True) 

474 shutting_down.wait(1.0) 

475 

476 threading.Thread(target=_watch_theme_file, daemon=True).start() 

477 

478 def _watch_no_clients() -> None: 

479 """Periodically check if all clients have disconnected and schedule shutdown.""" 

480 no_client_since: float | None = None 

481 while not shutting_down.is_set(): 481 ↛ exitline 481 didn't return from function '_watch_no_clients' because the condition on line 481 was always true

482 shutting_down.wait(5.0) 

483 if shutting_down.is_set(): 

484 break 

485 if not printer.has_clients(): 

486 if no_client_since is None: 

487 no_client_since = time.monotonic() 

488 elif time.monotonic() - no_client_since >= 10.0: 

489 _schedule_shutdown() 

490 else: 

491 no_client_since = None 

492 

493 threading.Thread(target=_watch_no_clients, daemon=True).start() 

494 

495 def run_agent_thread( 

496 task: str, 

497 model_name: str, 

498 stop_ev: threading.Event, 

499 attachments: list | None = None, 

500 ) -> None: 

501 nonlocal running, agent_thread, merging 

502 from kiss.core.models.model import Attachment 

503 

504 # Install per-thread stop event so _check_stop() uses this 

505 # thread's own event instead of the shared printer.stop_event. 

506 printer._thread_local.stop_event = stop_ev 

507 current_thread = threading.current_thread() 

508 

509 parsed_attachments: list[Attachment] | None = None 

510 if attachments: 

511 parsed_attachments = [] 

512 for att in attachments: 

513 data = base64.b64decode(att["data"]) 

514 parsed_attachments.append(Attachment(data=data, mime_type=att["mime_type"])) 

515 

516 pre_hunks: dict[str, list[tuple[int, int, int, int]]] = {} 

517 pre_untracked: set[str] = set() 

518 pre_file_hashes: dict[str, str] = {} 

519 result_text = "" 

520 done_event: dict[str, str] | None = None 

521 try: 

522 _add_task(task) 

523 printer.broadcast({"type": "tasks_updated"}) 

524 pre_hunks = _parse_diff_hunks(actual_work_dir) 

525 pre_untracked = _capture_untracked(actual_work_dir) 

526 pre_file_hashes = _snapshot_files( 

527 actual_work_dir, set(pre_hunks.keys()) | pre_untracked 

528 ) 

529 _save_untracked_base( 

530 actual_work_dir, cs_data_dir, pre_untracked | set(pre_hunks.keys()) 

531 ) 

532 active_file = _read_active_file(cs_data_dir) 

533 printer.start_recording() 

534 printer.broadcast({"type": "clear", "active_file": active_file}) 

535 agent = agent_factory("Chatbot") 

536 extra_kwargs = dict(agent_kwargs or {}) 

537 if active_file: 

538 extra_kwargs["current_editor_file"] = active_file 

539 result = agent.run( 

540 prompt_template=task, 

541 work_dir=actual_work_dir, 

542 printer=printer, 

543 model_name=model_name, 

544 attachments=parsed_attachments, 

545 **extra_kwargs, 

546 ) 

547 result_text = result or "" 

548 done_event = {"type": "task_done"} 

549 except (KeyboardInterrupt, _StopRequested): 

550 logger.debug("Exception caught", exc_info=True) 

551 result_text = "(stopped)" 

552 done_event = {"type": "task_stopped"} 

553 except Exception as e: 

554 logger.debug("Exception caught", exc_info=True) 

555 result_text = f"(error: {e})" 

556 done_event = {"type": "task_error", "text": str(e)} 

557 finally: 

558 printer._thread_local.stop_event = None 

559 chat_events = printer.stop_recording() 

560 _append_task_to_md(task, result_text) 

561 with running_lock: 

562 if agent_thread is not current_thread: 

563 # Stopped externally; stop_agent already broadcast 

564 # task_stopped which is captured in chat_events. 

565 _set_latest_chat_events(chat_events, task=task) 

566 return 

567 running = False 

568 agent_thread = None 

569 # Broadcast AFTER setting running=False so clients can 

570 # immediately submit a new task without getting a 409. 

571 if done_event: 571 ↛ 580line 571 didn't jump to line 580 because the condition on line 571 was always true

572 chat_events.append(done_event) 

573 printer.broadcast(done_event) 

574 if done_event.get("type") == "task_done": 

575 threading.Thread( 

576 target=generate_followup, 

577 args=(task, result_text), 

578 daemon=True, 

579 ).start() 

580 _set_latest_chat_events(chat_events, task=task) 

581 try: 

582 merge_result = _prepare_merge_view( 

583 actual_work_dir, 

584 cs_data_dir, 

585 pre_hunks, 

586 pre_untracked, 

587 pre_file_hashes, 

588 ) 

589 if merge_result.get("status") == "opened": 

590 with running_lock: 

591 merging = True 

592 printer.broadcast({"type": "merge_started"}) 

593 except Exception: 

594 logger.debug("Exception caught", exc_info=True) 

595 refresh_file_cache() 

596 try: 

597 refresh_proposed_tasks() 

598 except Exception: 

599 logger.debug("Exception caught", exc_info=True) 

600 

601 def stop_agent() -> bool: 

602 """Kill the current agent thread and reset state for a new task. 

603 

604 Sets the thread's per-thread stop event so the agent stops at 

605 the next printer.print() or token_callback() check. Also injects 

606 _StopRequested via PyThreadState_SetAsyncExc as a fallback. 

607 """ 

608 nonlocal running, agent_thread, current_stop_event 

609 with running_lock: 

610 thread = agent_thread 

611 if thread is None or not thread.is_alive(): 

612 return False 

613 running = False 

614 agent_thread = None 

615 # Set the per-thread stop event so only this thread sees 

616 # the stop signal. New threads get their own fresh event. 

617 stop_ev = current_stop_event 

618 current_stop_event = None 

619 if stop_ev is not None: 619 ↛ 621line 619 didn't jump to line 621 because the condition on line 619 was always true

620 stop_ev.set() 

621 import ctypes 

622 

623 tid = thread.ident 

624 if tid is not None: 624 ↛ 629line 624 didn't jump to line 629 because the condition on line 624 was always true

625 ctypes.pythonapi.PyThreadState_SetAsyncExc( 

626 ctypes.c_ulong(tid), 

627 ctypes.py_object(_StopRequested), 

628 ) 

629 printer.broadcast({"type": "task_stopped"}) 

630 return True 

631 

632 # True when this instance created a PID-specific data dir for isolation. 

633 _is_isolated = cs_data_dir.endswith(f"-{os.getpid()}") 

634 

635 def _cleanup() -> None: 

636 nonlocal merging 

637 with running_lock: 

638 was_merging = merging 

639 merging = False 

640 if was_merging: 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true

641 _restore_merge_files(cs_data_dir, actual_work_dir) 

642 stop_agent() 

643 if cs_proc and cs_proc.poll() is None: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true

644 try: 

645 os.killpg(cs_proc.pid, 15) # SIGTERM to process group 

646 except OSError: 

647 cs_proc.terminate() 

648 try: 

649 cs_proc.wait(timeout=5) 

650 except Exception: 

651 logger.debug("Exception caught", exc_info=True) 

652 try: 

653 os.killpg(cs_proc.pid, 9) # SIGKILL to process group 

654 except OSError: 

655 cs_proc.kill() 

656 # Remove PID-specific data dir created for instance isolation. 

657 if _is_isolated: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 try: 

659 shutil.rmtree(cs_data_dir, ignore_errors=True) 

660 except Exception: 

661 logger.debug("Exception caught", exc_info=True) 

662 

663 def _do_shutdown() -> None: 

664 with running_lock: 

665 if running or printer.has_clients(): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 return 

667 shutting_down.set() 

668 _cleanup() 

669 server.should_exit = True 

670 

671 def _cancel_shutdown() -> None: 

672 nonlocal shutdown_timer 

673 with shutdown_lock: 

674 if shutdown_timer is not None: 674 ↛ exitline 674 didn't jump to the function exit

675 shutdown_timer.cancel() 

676 shutdown_timer = None 

677 

678 def _schedule_shutdown() -> None: 

679 nonlocal shutdown_timer 

680 if printer.has_clients(): 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true

681 return 

682 with running_lock: 

683 if running: 683 ↛ 684line 683 didn't jump to line 684 because the condition on line 683 was never true

684 return 

685 with shutdown_lock: 

686 if shutdown_timer is not None: 

687 shutdown_timer.cancel() 

688 shutdown_timer = threading.Timer(10.0, _do_shutdown) 

689 shutdown_timer.daemon = True 

690 shutdown_timer.start() 

691 

692 async def index(request: Request) -> HTMLResponse: 

693 return HTMLResponse(html_page) 

694 

695 async def events(request: Request) -> StreamingResponse: 

696 cq = printer.add_client() 

697 _cancel_shutdown() 

698 

699 async def generate() -> AsyncGenerator[str]: 

700 last_heartbeat = time.monotonic() 

701 disconnect_check_counter = 0 

702 try: 

703 while not shutting_down.is_set(): 703 ↛ 723line 703 didn't jump to line 723 because the condition on line 703 was always true

704 disconnect_check_counter += 1 

705 if disconnect_check_counter >= 20: 

706 disconnect_check_counter = 0 

707 if await request.is_disconnected(): 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true

708 break 

709 try: 

710 event = cq.get_nowait() 

711 except queue.Empty: 

712 now = time.monotonic() 

713 if now - last_heartbeat >= 5.0: 

714 yield ": heartbeat\n\n" 

715 last_heartbeat = now 

716 await asyncio.sleep(0.05) 

717 continue 

718 yield f"data: {json.dumps(event)}\n\n" 

719 last_heartbeat = time.monotonic() 

720 except asyncio.CancelledError: 

721 logger.debug("Exception caught", exc_info=True) 

722 finally: 

723 printer.remove_client(cq) 

724 _schedule_shutdown() 

725 

726 return StreamingResponse( 

727 generate(), 

728 media_type="text/event-stream", 

729 headers={ 

730 "Cache-Control": "no-cache", 

731 "X-Accel-Buffering": "no", 

732 "Connection": "keep-alive", 

733 }, 

734 ) 

735 

736 async def run_task(request: Request) -> JSONResponse: 

737 nonlocal running, agent_thread, selected_model, current_stop_event 

738 body = await request.json() 

739 task = body.get("task", "").strip() 

740 model = body.get("model", "").strip() or selected_model 

741 attachments = body.get("attachments") 

742 selected_model = model 

743 if not task: 

744 return JSONResponse({"error": "Empty task"}, status_code=400) 

745 _record_model_usage(model) 

746 if model not in _INTERNAL_MODELS: 

747 _record_model_usage(model) 

748 stop_ev = threading.Event() 

749 t = threading.Thread( 

750 target=run_agent_thread, 

751 args=(task, model, stop_ev, attachments), 

752 daemon=True, 

753 ) 

754 with running_lock: 

755 if merging: 

756 return JSONResponse( 

757 {"error": "Resolve all diffs in the merge view first"}, 

758 status_code=409, 

759 ) 

760 if running: 

761 return JSONResponse({"error": "Agent is already running"}, status_code=409) 

762 current_stop_event = stop_ev 

763 running = True 

764 agent_thread = t 

765 t.start() 

766 return JSONResponse({"status": "started"}) 

767 

768 async def run_selection(request: Request) -> JSONResponse: 

769 """Run the agent on text selected in the VS Code editor. 

770 

771 Broadcasts an ``external_run`` event so the chatbox UI enters 

772 running state and displays the selected text as a user message, 

773 then starts the agent thread with the selected text as the task. 

774 """ 

775 nonlocal running, agent_thread, current_stop_event 

776 body = await request.json() 

777 text = body.get("text", "").strip() 

778 if not text: 

779 return JSONResponse({"error": "No text selected"}, status_code=400) 

780 model = selected_model 

781 _record_model_usage(model) 

782 if model not in _INTERNAL_MODELS: 

783 _record_model_usage(model) 

784 stop_ev = threading.Event() 

785 t = threading.Thread( 

786 target=run_agent_thread, 

787 args=(text, model, stop_ev, None), 

788 daemon=True, 

789 ) 

790 with running_lock: 

791 if merging: 

792 return JSONResponse( 

793 {"error": "Resolve all diffs in the merge view first"}, 

794 status_code=409, 

795 ) 

796 if running: 

797 return JSONResponse({"error": "Agent is already running"}, status_code=409) 

798 current_stop_event = stop_ev 

799 running = True 

800 agent_thread = t 

801 # Broadcast AFTER lock confirms task will start, so clients 

802 # never see external_run for a task that returns 409. 

803 printer.broadcast({"type": "external_run", "text": text}) 

804 t.start() 

805 return JSONResponse({"status": "started"}) 

806 

807 async def stop_task(request: Request) -> JSONResponse: 

808 if stop_agent(): 

809 return JSONResponse({"status": "stopping"}) 

810 return JSONResponse({"error": "No running task"}, status_code=404) 

811 

812 async def suggestions(request: Request) -> JSONResponse: 

813 query = request.query_params.get("q", "").strip() 

814 mode = request.query_params.get("mode", "general") 

815 if mode == "files": 

816 q = query.lower() 

817 usage = _load_file_usage() 

818 frequent: list[dict[str, str]] = [] 

819 rest: list[dict[str, str]] = [] 

820 for path in file_cache: 

821 if not q or q in path.lower(): 

822 ptype = "dir" if path.endswith("/") else "file" 

823 item = {"type": ptype, "text": path} 

824 if usage.get(path, 0) > 0: 

825 frequent.append(item) 

826 else: 

827 rest.append(item) 

828 frequent.sort(key=lambda m: (m["type"] != "file", -usage.get(m["text"], 0))) 

829 rest.sort(key=lambda m: m["type"] != "file") 

830 for f in frequent: 

831 f["type"] = "frequent_" + f["type"] 

832 return JSONResponse((frequent + rest)[:20]) 

833 if not query: 

834 return JSONResponse([]) 

835 q_lower = query.lower() 

836 results = [] 

837 for entry in _load_history(): 

838 task = str(entry["task"]) 

839 if q_lower in task.lower(): 

840 results.append({"type": "task", "text": task}) 

841 if len(results) >= 5: 

842 break 

843 with proposed_lock: 

844 for t in proposed_tasks: 

845 if q_lower in t.lower(): 

846 results.append({"type": "suggested", "text": t}) 

847 words = query.split() 

848 last_word = words[-1].lower() if words else q_lower 

849 if last_word and len(last_word) >= 2: 

850 count = 0 

851 for path in file_cache: 

852 if last_word in path.lower(): 

853 results.append({"type": "file", "text": path}) 

854 count += 1 

855 if count >= 8: 

856 break 

857 return JSONResponse(results) 

858 

859 async def tasks(request: Request) -> JSONResponse: 

860 history = _load_history() 

861 return JSONResponse( 

862 [ 

863 {"task": e["task"], "has_events": bool(e.get("chat_events"))} 

864 for e in history 

865 ] 

866 ) 

867 

868 async def task_events(request: Request) -> JSONResponse: 

869 """Return chat events for a specific task by index.""" 

870 try: 

871 idx = int(request.query_params.get("idx", "0")) 

872 except (ValueError, TypeError): 

873 return JSONResponse({"error": "Invalid index"}, status_code=400) 

874 history = _load_history() 

875 if idx < 0 or idx >= len(history): 

876 return JSONResponse({"error": "Index out of range"}, status_code=404) 

877 events: list[dict[str, object]] = history[idx].get("chat_events", []) # type: ignore[assignment] 

878 return JSONResponse(events) 

879 

880 async def proposed_tasks_endpoint(request: Request) -> JSONResponse: 

881 with proposed_lock: 

882 tasks_list = list(proposed_tasks) 

883 if not tasks_list: 883 ↛ 884line 883 didn't jump to line 884 because the condition on line 883 was never true

884 tasks_list = [str(t["task"]) for t in SAMPLE_TASKS[:5]] 

885 return JSONResponse(tasks_list) 

886 

887 def _fast_complete(raw_query: str, query: str) -> str: 

888 query_lower = query.lower() 

889 for entry in _load_history(): 

890 task = str(entry.get("task", "")) 

891 if task.lower().startswith(query_lower) and len(task) > len(query): 

892 return task[len(query):] 

893 words = raw_query.split() 

894 last_word = words[-1] if words else "" 

895 if last_word and len(last_word) >= 2: 

896 lw_lower = last_word.lower() 

897 for path in file_cache: 

898 if path.lower().startswith(lw_lower) and len(path) > len(last_word): 

899 return path[len(last_word) :] 

900 return "" 

901 

902 async def complete(request: Request) -> JSONResponse: 

903 raw_query = request.query_params.get("q", "") 

904 query = raw_query.strip() 

905 if not query or len(query) < 2: 

906 return JSONResponse({"suggestion": ""}) 

907 

908 fast = _fast_complete(raw_query, query) 

909 if fast: 

910 return JSONResponse({"suggestion": fast}) 

911 

912 def _generate() -> str: 

913 history = _load_history() 

914 task_list = "\n".join(f"- {e['task']}" for e in history[:20]) 

915 agent = KISSAgent("Autocomplete") 

916 try: 

917 result = agent.run( 

918 model_name=_FAST_MODEL, 

919 prompt_template=( 

920 "You are an inline autocomplete engine for a coding assistant. " 

921 "Given the user's partial input and their past task history, " 

922 "predict what they want to type and return ONLY the remaining " 

923 "text to complete their input. Do NOT repeat the text they already typed. " 

924 "Keep the completion concise and natural." 

925 "If no good completion, return empty string.\n\n" 

926 "Past tasks:\n{task_list}\n\n" 

927 'Partial input: "{query}"\n\n' 

928 ), 

929 arguments={"task_list": task_list, "query": query}, 

930 is_agentic=False, 

931 ) 

932 s = _clean_llm_output(result) 

933 if s.lower().startswith(query.lower()): 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true

934 s = s[len(query) :] 

935 return s 

936 except Exception: 

937 logger.debug("Exception caught", exc_info=True) 

938 return "" 

939 

940 suggestion = await asyncio.to_thread(_generate) 

941 return JSONResponse({"suggestion": suggestion}) 

942 

943 async def models_endpoint(request: Request) -> JSONResponse: 

944 usage = _load_model_usage() 

945 models_list: list[dict[str, Any]] = [] 

946 for name in get_available_models(): 

947 info = MODEL_INFO.get(name) 

948 if info and info.is_function_calling_supported: 

949 models_list.append( 

950 { 

951 "name": name, 

952 "inp": info.input_price_per_1M, 

953 "out": info.output_price_per_1M, 

954 "uses": usage.get(name, 0), 

955 } 

956 ) 

957 models_list.sort( 

958 key=lambda m: ( 

959 _model_vendor_order(str(m["name"])), 

960 -(float(m["inp"]) + float(m["out"])), 

961 ) 

962 ) 

963 return JSONResponse({"models": models_list, "selected": selected_model}) 

964 

965 async def closing(request: Request) -> JSONResponse: 

966 """Handle browser tab/window closing. Schedule a quick shutdown.""" 

967 _schedule_shutdown() 

968 return JSONResponse({"status": "ok"}) 

969 

970 async def focus_chatbox(request: Request) -> JSONResponse: 

971 printer.broadcast({"type": "focus_chatbox"}) 

972 return JSONResponse({"status": "ok"}) 

973 

974 async def focus_editor(request: Request) -> JSONResponse: 

975 pending = os.path.join(cs_data_dir, "pending-focus-editor.json") 

976 with open(pending, "w") as f: 

977 json.dump({"focus": True}, f) 

978 return JSONResponse({"status": "ok"}) 

979 

980 async def theme(request: Request) -> JSONResponse: 

981 theme_file = _KISS_DIR / "vscode-theme.json" 

982 kind = "dark" 

983 if theme_file.exists(): 

984 try: 

985 data = json.loads(theme_file.read_text()) 

986 kind = data.get("kind", "dark") 

987 except (json.JSONDecodeError, OSError): 

988 logger.debug("Exception caught", exc_info=True) 

989 return JSONResponse(_THEME_PRESETS.get(kind, _THEME_PRESETS["dark"])) 

990 

991 async def open_file(request: Request) -> JSONResponse: 

992 body = await request.json() 

993 rel = body.get("path", "").strip() 

994 if not rel: 

995 return JSONResponse({"error": "No path"}, status_code=400) 

996 full = rel if rel.startswith("/") else os.path.join(actual_work_dir, rel) 

997 if not os.path.isfile(full): 

998 return JSONResponse({"error": "File not found"}, status_code=404) 

999 pending = os.path.join(cs_data_dir, "pending-open.json") 

1000 with open(pending, "w") as f: 

1001 json.dump({"path": full}, f) 

1002 return JSONResponse({"status": "ok"}) 

1003 

1004 async def merge_action(request: Request) -> JSONResponse: 

1005 nonlocal merging 

1006 body = await request.json() 

1007 action = body.get("action", "") 

1008 if action == "all-done": 

1009 with running_lock: 

1010 merging = False 

1011 printer.broadcast({"type": "merge_ended"}) 

1012 _cleanup_merge_data(cs_data_dir) 

1013 return JSONResponse({"status": "ok"}) 

1014 if action not in ("prev", "next", "accept-all", "reject-all", "accept", "reject"): 

1015 return JSONResponse({"error": "Invalid action"}, status_code=400) 

1016 pending = os.path.join(cs_data_dir, "pending-action.json") 

1017 with open(pending, "w") as f: 

1018 json.dump({"action": action}, f) 

1019 return JSONResponse({"status": "ok"}) 

1020 

1021 async def _thread_json_response( 

1022 fn: Callable[[], dict[str, str]], 

1023 error_status: int = 400, 

1024 ) -> JSONResponse: 

1025 result = await asyncio.to_thread(fn) 

1026 if "error" in result: 

1027 return JSONResponse(result, status_code=error_status) 

1028 return JSONResponse(result) 

1029 async def commit(request: Request) -> JSONResponse: 

1030 def _do_commit() -> dict[str, str]: 

1031 try: 

1032 subprocess.run(["git", "add", "-A"], cwd=actual_work_dir) 

1033 diff_stat = subprocess.run( 

1034 ["git", "diff", "--cached", "--stat"], 

1035 capture_output=True, 

1036 text=True, 

1037 cwd=actual_work_dir, 

1038 ) 

1039 if not diff_stat.stdout.strip(): 

1040 return {"error": "No changes to commit"} 

1041 diff_detail = subprocess.run( 

1042 ["git", "diff", "--cached"], 

1043 capture_output=True, 

1044 text=True, 

1045 cwd=actual_work_dir, 

1046 ) 

1047 message = _generate_commit_msg(diff_detail.stdout) 

1048 commit_env = { 

1049 **os.environ, 

1050 "GIT_COMMITTER_NAME": "KISS Sorcar", 

1051 "GIT_COMMITTER_EMAIL": "ksen@berkeley.edu", 

1052 } 

1053 result = subprocess.run( 

1054 ["git", "commit", "-m", message, "--author=KISS Sorcar <ksen@berkeley.edu>"], 

1055 capture_output=True, 

1056 text=True, 

1057 cwd=actual_work_dir, 

1058 env=commit_env, 

1059 ) 

1060 if result.returncode != 0: 

1061 return {"error": result.stderr.strip()} 

1062 return {"status": "ok", "message": message} 

1063 except Exception as e: 

1064 logger.debug("Exception caught", exc_info=True) 

1065 return {"error": str(e)} 

1066 

1067 return await _thread_json_response(_do_commit) 

1068 

1069 async def push(request: Request) -> JSONResponse: 

1070 def _do_push() -> dict[str, str]: 

1071 try: 

1072 result = subprocess.run( 

1073 ["git", "push"], 

1074 capture_output=True, 

1075 text=True, 

1076 cwd=actual_work_dir, 

1077 ) 

1078 if result.returncode != 0: 1078 ↛ 1080line 1078 didn't jump to line 1080 because the condition on line 1078 was always true

1079 return {"error": result.stderr.strip() or "Push failed"} 

1080 return {"status": "ok"} 

1081 except Exception as e: 

1082 logger.debug("Exception caught", exc_info=True) 

1083 return {"error": str(e)} 

1084 

1085 return await _thread_json_response(_do_push) 

1086 

1087 

1088 async def record_file_usage_endpoint( 

1089 request: Request, 

1090 ) -> JSONResponse: 

1091 body = await request.json() 

1092 path = body.get("path", "").strip() 

1093 if path: 

1094 _record_file_usage(path) 

1095 return JSONResponse({"status": "ok"}) 

1096 

1097 async def generate_commit_message(request: Request) -> JSONResponse: 

1098 """Generate a git commit message from current diff and fill the SCM input.""" 

1099 

1100 def _generate() -> dict[str, str]: 

1101 try: 

1102 diff_result = subprocess.run( 

1103 ["git", "diff"], 

1104 capture_output=True, 

1105 text=True, 

1106 cwd=actual_work_dir, 

1107 ) 

1108 cached_result = subprocess.run( 

1109 ["git", "diff", "--cached"], 

1110 capture_output=True, 

1111 text=True, 

1112 cwd=actual_work_dir, 

1113 ) 

1114 diff_text = (diff_result.stdout + cached_result.stdout).strip() 

1115 untracked_files = "\n".join(sorted(_capture_untracked(actual_work_dir))) 

1116 if not diff_text and not untracked_files: 

1117 return {"error": "No changes detected"} 

1118 context_parts = [] 

1119 if diff_text: 1119 ↛ 1121line 1119 didn't jump to line 1121 because the condition on line 1119 was always true

1120 context_parts.append(f"Diff:\n{diff_text[:4000]}") 

1121 if untracked_files: 

1122 context_parts.append(f"New untracked files:\n{untracked_files[:500]}") 

1123 msg = _generate_commit_msg("\n\n".join(context_parts), detailed=True) 

1124 scm_pending = os.path.join(cs_data_dir, "pending-scm-message.json") 

1125 with open(scm_pending, "w") as f: 

1126 json.dump({"message": msg}, f) 

1127 return {"message": msg} 

1128 except Exception as e: 

1129 logger.debug("Exception caught", exc_info=True) 

1130 return {"error": str(e)} 

1131 

1132 return await _thread_json_response(_generate) 

1133 

1134 async def active_file_info(request: Request) -> JSONResponse: 

1135 """Check if the current editor file is a runnable prompt.""" 

1136 fpath = _read_active_file(cs_data_dir) 

1137 if not fpath or not fpath.lower().endswith(".md"): 

1138 return JSONResponse({"is_prompt": False, "path": fpath}) 

1139 from kiss.agents.sorcar.prompt_detector import PromptDetector 

1140 

1141 detector = PromptDetector() 

1142 is_prompt, _score, _reasons = detector.analyze(fpath) 

1143 return JSONResponse( 

1144 { 

1145 "is_prompt": is_prompt, 

1146 "path": fpath, 

1147 "filename": os.path.basename(fpath), 

1148 } 

1149 ) 

1150 

1151 async def get_file_content(request: Request) -> JSONResponse: 

1152 """Return the text content of a file.""" 

1153 fpath = request.query_params.get("path", "").strip() 

1154 if not fpath or not os.path.isfile(fpath): 

1155 return JSONResponse({"error": "File not found"}, status_code=404) 

1156 try: 

1157 with open(fpath, encoding="utf-8") as f: 

1158 content = f.read() 

1159 return JSONResponse({"content": content}) 

1160 except Exception as e: 

1161 logger.debug("Exception caught", exc_info=True) 

1162 return JSONResponse({"error": str(e)}, status_code=500) 

1163 

1164 async def generate_config_message(request: Request) -> JSONResponse: 

1165 body = await request.json() 

1166 model = body.get("model", selected_model) 

1167 

1168 def _generate() -> dict[str, str]: 

1169 cfg = config_module.DEFAULT_CONFIG 

1170 history = _load_history() 

1171 info_parts = [ 

1172 f"Work directory: {actual_work_dir}", 

1173 f"Selected model: {model}", 

1174 f"Max steps: {cfg.sorcar.sorcar_agent.max_steps}", 

1175 f"Max budget: ${cfg.sorcar.sorcar_agent.max_budget:.2f}", 

1176 f"Global max budget: ${cfg.agent.global_max_budget:.2f}", 

1177 f"Headless browser: {cfg.sorcar.sorcar_agent.headless}", 

1178 f"Code-server: {'running' if code_server_url else 'not available'}", 

1179 f"Tasks completed: {len(history)}", 

1180 ] 

1181 recent = [str(e["task"]) for e in history[:5]] if history else [] 

1182 if recent: 1182 ↛ 1184line 1182 didn't jump to line 1184 because the condition on line 1182 was always true

1183 info_parts.append("Recent tasks: " + "; ".join(recent)) 

1184 config_info = "\n".join(info_parts) 

1185 

1186 agent = KISSAgent("Config Message Generator") 

1187 try: 

1188 result = agent.run( 

1189 model_name=_FAST_MODEL, 

1190 prompt_template=( 

1191 "You are a helpful assistant. Given the following configuration " 

1192 "information about a coding assistant environment, generate a " 

1193 "short, nicely formatted, informative status message that a " 

1194 "developer would find useful. Include key details like the " 

1195 "model, work directory, budget, and recent activity. " 

1196 "Keep it concise (3-5 lines) and use emoji for visual appeal. " 

1197 "Return ONLY the message text, no quotes or markdown fences.\n\n" 

1198 "Configuration:\n{config_info}" 

1199 ), 

1200 arguments={"config_info": config_info}, 

1201 is_agentic=False, 

1202 ) 

1203 return {"message": result.strip()} 

1204 except Exception as e: 

1205 logger.debug("Exception caught", exc_info=True) 

1206 return {"error": str(e)} 

1207 

1208 return await _thread_json_response(_generate, error_status=500) 

1209 

1210 app = Starlette( 

1211 routes=[ 

1212 Route("/", index), 

1213 Route("/events", events), 

1214 Route("/run", run_task, methods=["POST"]), 

1215 Route("/run-selection", run_selection, methods=["POST"]), 

1216 Route("/stop", stop_task, methods=["POST"]), 

1217 Route("/open-file", open_file, methods=["POST"]), 

1218 Route("/closing", closing, methods=["POST"]), 

1219 Route("/focus-chatbox", focus_chatbox, methods=["POST"]), 

1220 Route("/focus-editor", focus_editor, methods=["POST"]), 

1221 Route("/commit", commit, methods=["POST"]), 

1222 Route("/push", push, methods=["POST"]), 

1223 Route("/merge-action", merge_action, methods=["POST"]), 

1224 Route("/record-file-usage", record_file_usage_endpoint, methods=["POST"]), 

1225 Route("/generate-commit-message", generate_commit_message, methods=["POST"]), 

1226 Route("/generate-config-message", generate_config_message, methods=["POST"]), 

1227 Route("/active-file-info", active_file_info), 

1228 Route("/get-file-content", get_file_content), 

1229 Route("/suggestions", suggestions), 

1230 Route("/complete", complete), 

1231 Route("/tasks", tasks), 

1232 Route("/task-events", task_events), 

1233 Route("/proposed_tasks", proposed_tasks_endpoint), 

1234 Route("/models", models_endpoint), 

1235 Route("/theme", theme), 

1236 ] 

1237 ) 

1238 

1239 threading.Thread(target=refresh_proposed_tasks, daemon=True).start() 

1240 

1241 import atexit 

1242 

1243 atexit.register(_cleanup) 

1244 

1245 port = find_free_port() 

1246 try: 

1247 Path(cs_data_dir).mkdir(parents=True, exist_ok=True) 

1248 (Path(cs_data_dir) / "assistant-port").write_text(str(port)) 

1249 except OSError: 

1250 logger.debug("Exception caught", exc_info=True) 

1251 url = f"http://127.0.0.1:{port}" 

1252 print(f"{title} running at {url}", flush=True) 

1253 print(f"Work directory: {actual_work_dir}", flush=True) 

1254 printer.print(f"{title} running at {url}") 

1255 printer.print(f"Work directory: {actual_work_dir}") 

1256 

1257 def _open_browser() -> None: 

1258 time.sleep(2) 

1259 try: 

1260 if not webbrowser.open(url): 1260 ↛ exitline 1260 didn't return from function '_open_browser' because the condition on line 1260 was always true

1261 logger.warning("webbrowser.open() returned False for %s", url) 

1262 except Exception: 

1263 logger.warning("Failed to open browser", exc_info=True) 

1264 if sys.platform == "darwin": 

1265 try: 

1266 subprocess.Popen( 

1267 ["open", url], 

1268 stdout=subprocess.DEVNULL, 

1269 stderr=subprocess.DEVNULL, 

1270 ) 

1271 except Exception: 

1272 logger.warning("Fallback 'open' command also failed", exc_info=True) 

1273 

1274 threading.Thread(target=_open_browser, daemon=True).start() 

1275 logging.getLogger("uvicorn.error").setLevel(logging.CRITICAL) 

1276 config = uvicorn.Config( 

1277 app, 

1278 host="127.0.0.1", 

1279 port=port, 

1280 log_level="warning", 

1281 timeout_graceful_shutdown=1, 

1282 ) 

1283 server = uvicorn.Server(config) 

1284 _orig_handle_exit = server.handle_exit 

1285 

1286 def _on_exit(sig: int, frame: types.FrameType | None) -> None: 

1287 shutting_down.set() 

1288 _orig_handle_exit(sig, frame) 

1289 

1290 server.handle_exit = _on_exit # type: ignore[method-assign] 

1291 try: 

1292 server.run() 

1293 except KeyboardInterrupt: 

1294 logger.debug("Exception caught", exc_info=True) 

1295 _cleanup() 

1296 

1297 

1298def main() -> None: 

1299 """Launch the KISS chatbot UI in assistant or coding mode based on KISS_MODE env var.""" 

1300 import argparse 

1301 

1302 from kiss._version import __version__ 

1303 from kiss.agents.sorcar.sorcar_agent import SorcarAgent 

1304 

1305 parser = argparse.ArgumentParser(description="KISS Assistant") 

1306 parser.add_argument( 

1307 "work_dir", 

1308 nargs="?", 

1309 default=os.getcwd(), 

1310 help="Working directory for the agent", 

1311 ) 

1312 parser.add_argument( 

1313 "--model_name", 

1314 default="claude-opus-4-6", 

1315 help="Default LLM model name", 

1316 ) 

1317 args = parser.parse_args() 

1318 work_dir = str(Path(args.work_dir).resolve()) 

1319 

1320 is_assistant = os.environ.get("KISS_MODE", "assistant").lower() == "assistant" 

1321 run_chatbot( 

1322 agent_factory=SorcarAgent, 

1323 title=f"KISS {'Assistant' if is_assistant else 'Coding Assistant'}: {__version__}", 

1324 work_dir=work_dir, 

1325 default_model=args.model_name, 

1326 agent_kwargs={"headless": not is_assistant}, 

1327 ) 

1328 

1329 

1330if __name__ == "__main__": 

1331 main()