Coverage for src / kiss / agents / sorcar / sorcar.py: 76%
814 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 22:57 -0700
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 22:57 -0700
1"""Browser-based chatbot for RelentlessAgent-based agents."""
3from __future__ import annotations
5import asyncio
6import base64
7import hashlib
8import json
9import logging
10import os
11import queue
12import shutil
13import socket
14import subprocess
15import sys
16import threading
17import time
18import types
19import webbrowser
20from collections.abc import AsyncGenerator, Callable
21from pathlib import Path
22from typing import Any
24from kiss.agents.sorcar.browser_ui import BaseBrowserPrinter, find_free_port
25from kiss.agents.sorcar.chatbot_ui import _THEME_PRESETS, _build_html
26from kiss.agents.sorcar.code_server import (
27 _capture_untracked,
28 _cleanup_merge_data,
29 _parse_diff_hunks,
30 _prepare_merge_view,
31 _restore_merge_files,
32 _save_untracked_base,
33 _scan_files,
34 _setup_code_server,
35 _snapshot_files,
36)
37from kiss.agents.sorcar.task_history import (
38 _KISS_DIR,
39 SAMPLE_TASKS,
40 _add_task,
41 _append_task_to_md,
42 _init_task_history_md,
43 _load_file_usage,
44 _load_history,
45 _load_last_model,
46 _load_model_usage,
47 _load_proposals,
48 _record_file_usage,
49 _record_model_usage,
50 _save_proposals,
51 _set_latest_chat_events,
52)
53from kiss.core import config as config_module
54from kiss.core.kiss_agent import KISSAgent
55from kiss.core.models.model_info import (
56 _OPENAI_PREFIXES,
57 MODEL_INFO,
58 get_available_models,
59)
60from kiss.core.relentless_agent import RelentlessAgent
62logger = logging.getLogger(__name__)
64_FAST_MODEL = "gemini-2.0-flash"
65_COMMIT_MODEL = "gemini-2.0-flash"
66_INTERNAL_MODELS = frozenset({_FAST_MODEL, _COMMIT_MODEL})
69class _StopRequested(BaseException):
70 pass
73def _read_active_file(cs_data_dir: str) -> str:
74 try:
75 af_path = os.path.join(cs_data_dir, "active-file.json")
76 with open(af_path) as af:
77 path: str = json.loads(af.read()).get("path", "")
78 if path and os.path.isfile(path):
79 return path
80 except (OSError, json.JSONDecodeError):
81 logger.debug("Exception caught", exc_info=True)
82 return ""
85def _clean_llm_output(text: str) -> str:
86 return text.strip().strip('"').strip("'")
89def _generate_commit_msg(diff_text: str, *, detailed: bool = False) -> str:
90 if detailed:
91 prompt = (
92 "Generate a nicely markdown formatted, informative git commit message for "
93 "these changes. Use conventional commit format with a clear subject "
94 "line (type: description) and optionally a body with bullet points "
95 "for multiple changes. Return ONLY the commit message text, no "
96 "quotes or markdown fences.\n\n{context}"
97 )
98 else:
99 prompt = (
100 "Generate a concise git commit message (1-2 lines) for these changes. "
101 "Return ONLY the commit message text, no quotes.\n\n{context}"
102 )
103 agent = KISSAgent("Commit Message Generator")
104 try:
105 raw = agent.run(
106 model_name=_COMMIT_MODEL,
107 prompt_template=prompt,
108 arguments={"context": diff_text},
109 is_agentic=False,
110 )
111 return _clean_llm_output(raw)
112 except Exception:
113 logger.debug("Exception caught", exc_info=True)
114 return ""
117def _model_vendor_order(name: str) -> int:
118 if name.startswith("claude-"):
119 return 0
120 if name.startswith(_OPENAI_PREFIXES):
121 return 1
122 if name.startswith("gemini-"):
123 return 2
124 if name.startswith("minimax-"):
125 return 3
126 if name.startswith("openrouter/"):
127 return 4
128 return 5
131def run_chatbot(
132 agent_factory: Callable[[str], RelentlessAgent],
133 title: str = "KISS Sorcar",
134 work_dir: str | None = None,
135 default_model: str = "claude-opus-4-6",
136 agent_kwargs: dict[str, Any] | None = None,
137) -> None:
138 """Run a browser-based chatbot UI for any RelentlessAgent-based agent.
140 Args:
141 agent_factory: Callable that takes a name string and returns a RelentlessAgent instance.
142 title: Title displayed in the browser tab.
143 work_dir: Working directory for the agent. Defaults to current directory.
144 default_model: Default LLM model name for the model selector.
145 agent_kwargs: Additional keyword arguments passed to agent.run().
146 """
147 import uvicorn
148 from starlette.applications import Starlette
149 from starlette.requests import Request
150 from starlette.responses import HTMLResponse, JSONResponse, StreamingResponse
151 from starlette.routing import Route
153 printer = BaseBrowserPrinter()
154 running = False
155 running_lock = threading.Lock()
156 shutting_down = threading.Event()
157 merging = False
158 actual_work_dir = work_dir or os.getcwd()
159 file_cache: list[str] = _scan_files(actual_work_dir)
160 agent_thread: threading.Thread | None = None
161 current_stop_event: threading.Event | None = None
162 proposed_tasks: list[str] = _load_proposals()
163 proposed_lock = threading.Lock()
164 selected_model = _load_last_model() or default_model
165 last = _load_last_model()
166 selected_model = last if last and last not in _INTERNAL_MODELS else default_model
168 _init_task_history_md()
170 cs_proc: subprocess.Popen[bytes] | None = None
171 code_server_url = ""
172 wd_hash = hashlib.md5(actual_work_dir.encode()).hexdigest()[:8]
173 cs_data_dir = str(_KISS_DIR / f"cs-{wd_hash}")
175 # If another sorcar instance is already running with this data dir,
176 # use a unique data dir for this instance to avoid collisions
177 # (e.g., assistant-port overwrite, shared IPC files).
178 _existing_port_file = Path(cs_data_dir) / "assistant-port"
179 if _existing_port_file.exists(): 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 try:
181 _existing_port = int(_existing_port_file.read_text().strip())
182 with socket.create_connection(
183 ("127.0.0.1", _existing_port), timeout=0.5
184 ):
185 # Another instance is reachable; isolate this instance.
186 cs_data_dir = str(
187 _KISS_DIR / f"cs-{wd_hash}-{os.getpid()}"
188 )
189 except (ConnectionRefusedError, OSError, ValueError):
190 pass # Port not reachable; safe to reuse this data dir.
192 # Restore files from any stale merge state (e.g., previous crash during merge)
193 _restore_merge_files(cs_data_dir, actual_work_dir)
195 # Read or assign a code-server port for this work directory.
196 # Use socket.bind directly (not find_free_port) so test patches
197 # that override find_free_port for the Starlette port don't collide.
198 cs_port_file = Path(cs_data_dir) / "cs-port"
199 cs_port_file.parent.mkdir(parents=True, exist_ok=True)
200 cs_port = 0
201 if cs_port_file.exists(): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 try:
203 cs_port = int(cs_port_file.read_text().strip())
204 except (ValueError, OSError):
205 logger.debug("Exception caught", exc_info=True)
206 if not cs_port: 206 ↛ 214line 206 didn't jump to line 214 because the condition on line 206 was always true
207 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as _s:
208 _s.bind(("", 0))
209 cs_port = int(_s.getsockname()[1])
210 try:
211 cs_port_file.write_text(str(cs_port))
212 except OSError:
213 logger.debug("Exception caught", exc_info=True)
214 cs_url = f"http://127.0.0.1:{cs_port}"
215 cs_binary = shutil.which("code-server")
217 def _code_server_launch_args() -> list[str]:
218 assert cs_binary is not None
219 return [
220 cs_binary,
221 "--port",
222 str(cs_port),
223 "--auth",
224 "none",
225 "--bind-addr",
226 f"127.0.0.1:{cs_port}",
227 "--disable-telemetry",
228 "--user-data-dir",
229 cs_data_dir,
230 "--extensions-dir",
231 str(Path(cs_data_dir) / "extensions"),
232 "--disable-getting-started-override",
233 "--disable-workspace-trust",
234 actual_work_dir,
235 ]
237 def _watch_code_server() -> None:
238 """Monitor code-server subprocess and restart it if it crashes."""
239 nonlocal cs_proc, code_server_url
240 while not shutting_down.is_set():
241 shutting_down.wait(5.0)
242 if shutting_down.is_set():
243 break
244 if cs_proc is None:
245 continue
246 ret = cs_proc.poll()
247 if ret is None:
248 continue
249 logger.warning(
250 "code-server exited with code %d, restarting...", ret
251 )
252 try:
253 from kiss.agents.sorcar.code_server import _MS_GALLERY
255 cs_env = {**os.environ, "EXTENSIONS_GALLERY": _MS_GALLERY}
256 cs_proc = subprocess.Popen(
257 _code_server_launch_args(),
258 stdout=subprocess.DEVNULL,
259 stderr=subprocess.DEVNULL,
260 env=cs_env,
261 start_new_session=True,
262 )
263 restarted = False
264 for _ in range(30):
265 try:
266 with socket.create_connection(
267 ("127.0.0.1", cs_port), timeout=0.5
268 ):
269 code_server_url = cs_url
270 restarted = True
271 break
272 except (ConnectionRefusedError, OSError):
273 logger.debug("Exception caught", exc_info=True)
274 time.sleep(0.5)
275 if restarted:
276 logger.info("code-server restarted at %s", code_server_url)
277 printer.broadcast({"type": "code_server_restarted"})
278 else:
279 logger.warning("code-server failed to restart")
280 except Exception:
281 logger.debug("Exception caught", exc_info=True)
282 if cs_binary: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 ext_changed = _setup_code_server(cs_data_dir)
284 port_in_use = False
285 try:
286 with socket.create_connection(("127.0.0.1", cs_port), timeout=0.5):
287 port_in_use = True
288 except (ConnectionRefusedError, OSError):
289 logger.debug("Exception caught", exc_info=True)
290 # If our stored port is not in use, verify it's still bindable;
291 # if another process grabbed it, pick a fresh port.
292 if not port_in_use:
293 try:
294 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as _s:
295 _s.bind(("127.0.0.1", cs_port))
296 except OSError:
297 cs_port = find_free_port()
298 cs_url = f"http://127.0.0.1:{cs_port}"
299 try:
300 cs_port_file.write_text(str(cs_port))
301 except OSError:
302 logger.debug("Exception caught", exc_info=True)
304 workdir_file = Path(cs_data_dir) / "workdir"
305 prev_workdir = ""
306 try:
307 prev_workdir = workdir_file.read_text().strip() if workdir_file.exists() else ""
308 except OSError:
309 logger.debug("Exception caught", exc_info=True)
310 workdir_changed = prev_workdir != actual_work_dir
312 need_restart = port_in_use and (ext_changed or workdir_changed)
313 if need_restart:
314 reason = "extension updated" if ext_changed else "work directory changed"
315 printer.print(f"Restarting code-server ({reason})...")
316 try:
317 result = subprocess.run(
318 ["lsof", "-ti", f":{cs_port}", "-sTCP:LISTEN"],
319 capture_output=True,
320 text=True,
321 )
322 for pid_str in result.stdout.strip().split("\n"):
323 if pid_str.strip():
324 os.kill(int(pid_str.strip()), 15)
325 time.sleep(1.5)
326 except Exception:
327 logger.debug("Exception caught", exc_info=True)
328 port_in_use = False
329 if port_in_use:
330 code_server_url = cs_url
331 printer.print(f"Reusing existing code-server at {code_server_url}")
332 else:
333 from kiss.agents.sorcar.code_server import _MS_GALLERY
335 cs_env = {**os.environ, "EXTENSIONS_GALLERY": _MS_GALLERY}
336 cs_proc = subprocess.Popen(
337 [
338 cs_binary,
339 "--port",
340 str(cs_port),
341 "--auth",
342 "none",
343 "--bind-addr",
344 f"127.0.0.1:{cs_port}",
345 "--disable-telemetry",
346 "--user-data-dir",
347 cs_data_dir,
348 "--extensions-dir",
349 str(Path(cs_data_dir) / "extensions"),
350 "--disable-getting-started-override",
351 "--disable-workspace-trust",
352 actual_work_dir,
353 ],
354 stdout=subprocess.DEVNULL,
355 stderr=subprocess.DEVNULL,
356 env=cs_env,
357 start_new_session=True,
358 )
359 for _ in range(30):
360 try:
361 with socket.create_connection(("127.0.0.1", cs_port), timeout=0.5):
362 code_server_url = cs_url
363 break
364 except (ConnectionRefusedError, OSError):
365 logger.debug("Exception caught", exc_info=True)
366 time.sleep(0.5)
367 if code_server_url:
368 printer.print(f"code-server running at {code_server_url}")
369 else:
370 printer.print("Warning: code-server failed to start")
371 if code_server_url:
372 try:
373 workdir_file.write_text(actual_work_dir)
374 except OSError:
375 logger.debug("Exception caught", exc_info=True)
377 if cs_binary and code_server_url: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true
378 threading.Thread(target=_watch_code_server, daemon=True).start()
380 html_page = _build_html(title, code_server_url, actual_work_dir)
381 shutdown_timer: threading.Timer | None = None
382 shutdown_lock = threading.Lock()
384 def refresh_file_cache() -> None:
385 nonlocal file_cache
386 file_cache = _scan_files(actual_work_dir)
388 def refresh_proposed_tasks() -> None:
389 nonlocal proposed_tasks
390 history = _load_history()
391 if not history: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 with proposed_lock:
393 proposed_tasks = []
394 printer.broadcast({"type": "proposed_updated"})
395 return
396 task_list = "\n".join(f"- {e['task']}" for e in history[:20])
397 agent = KISSAgent("Task Proposer")
398 try:
399 result = agent.run(
400 model_name=_FAST_MODEL,
401 prompt_template=(
402 "Based on these past tasks a developer has worked on, suggest 5 new "
403 "tasks they might want to do next. Tasks should be natural follow-ups, "
404 "related improvements, or complementary work.\n\n"
405 "Past tasks:\n{task_list}\n\n"
406 "Return ONLY a JSON array of 5 short task description strings. "
407 'Example: ["Add unit tests for X", "Refactor Y module"]'
408 ),
409 arguments={"task_list": task_list},
410 is_agentic=False,
411 )
412 start = result.index("[")
413 end = result.index("]", start) + 1
414 proposals = json.loads(result[start:end])
415 proposals = [str(p) for p in proposals if isinstance(p, str) and p.strip()][:5]
416 except Exception:
417 logger.debug("Exception caught", exc_info=True)
418 proposals = []
419 with proposed_lock:
420 proposed_tasks = proposals
421 _save_proposals(proposals)
422 printer.broadcast({"type": "proposed_updated"})
424 def generate_followup(task: str, result: str) -> None:
425 try:
426 agent = KISSAgent("Followup Proposer")
427 raw = agent.run(
428 model_name=_FAST_MODEL,
429 prompt_template=(
430 "A developer just completed this task:\n"
431 "Task: {task}\n"
432 "Result summary: {result}\n\n"
433 "Suggest ONE short, concrete follow-up task they "
434 "might want to do next. Return ONLY the task "
435 "description as a single plain-text sentence."
436 ),
437 arguments={
438 "task": task,
439 "result": result[:500],
440 },
441 is_agentic=False,
442 )
443 suggestion = _clean_llm_output(raw)
444 if suggestion: 444 ↛ exitline 444 didn't return from function 'generate_followup' because the condition on line 444 was always true
445 printer.broadcast(
446 {
447 "type": "followup_suggestion",
448 "text": suggestion,
449 }
450 )
451 except Exception:
452 logger.debug("Exception caught", exc_info=True)
454 def _watch_theme_file() -> None:
455 theme_file = _KISS_DIR / "vscode-theme.json"
456 last_mtime = 0.0
457 try:
458 if theme_file.exists(): 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true
459 last_mtime = theme_file.stat().st_mtime
460 except OSError:
461 logger.debug("Exception caught", exc_info=True)
462 while not shutting_down.is_set():
463 try:
464 if theme_file.exists():
465 mtime = theme_file.stat().st_mtime
466 if mtime > last_mtime:
467 last_mtime = mtime
468 data = json.loads(theme_file.read_text())
469 kind = data.get("kind", "dark")
470 colors = _THEME_PRESETS.get(kind, _THEME_PRESETS["dark"])
471 printer.broadcast({"type": "theme_changed", **colors})
472 except (OSError, json.JSONDecodeError):
473 logger.debug("Exception caught", exc_info=True)
474 shutting_down.wait(1.0)
476 threading.Thread(target=_watch_theme_file, daemon=True).start()
478 def _watch_no_clients() -> None:
479 """Periodically check if all clients have disconnected and schedule shutdown."""
480 no_client_since: float | None = None
481 while not shutting_down.is_set(): 481 ↛ exitline 481 didn't return from function '_watch_no_clients' because the condition on line 481 was always true
482 shutting_down.wait(5.0)
483 if shutting_down.is_set():
484 break
485 if not printer.has_clients():
486 if no_client_since is None:
487 no_client_since = time.monotonic()
488 elif time.monotonic() - no_client_since >= 10.0:
489 _schedule_shutdown()
490 else:
491 no_client_since = None
493 threading.Thread(target=_watch_no_clients, daemon=True).start()
495 def run_agent_thread(
496 task: str,
497 model_name: str,
498 stop_ev: threading.Event,
499 attachments: list | None = None,
500 ) -> None:
501 nonlocal running, agent_thread, merging
502 from kiss.core.models.model import Attachment
504 # Install per-thread stop event so _check_stop() uses this
505 # thread's own event instead of the shared printer.stop_event.
506 printer._thread_local.stop_event = stop_ev
507 current_thread = threading.current_thread()
509 parsed_attachments: list[Attachment] | None = None
510 if attachments:
511 parsed_attachments = []
512 for att in attachments:
513 data = base64.b64decode(att["data"])
514 parsed_attachments.append(Attachment(data=data, mime_type=att["mime_type"]))
516 pre_hunks: dict[str, list[tuple[int, int, int, int]]] = {}
517 pre_untracked: set[str] = set()
518 pre_file_hashes: dict[str, str] = {}
519 result_text = ""
520 done_event: dict[str, str] | None = None
521 try:
522 _add_task(task)
523 printer.broadcast({"type": "tasks_updated"})
524 pre_hunks = _parse_diff_hunks(actual_work_dir)
525 pre_untracked = _capture_untracked(actual_work_dir)
526 pre_file_hashes = _snapshot_files(
527 actual_work_dir, set(pre_hunks.keys()) | pre_untracked
528 )
529 _save_untracked_base(
530 actual_work_dir, cs_data_dir, pre_untracked | set(pre_hunks.keys())
531 )
532 active_file = _read_active_file(cs_data_dir)
533 printer.start_recording()
534 printer.broadcast({"type": "clear", "active_file": active_file})
535 agent = agent_factory("Chatbot")
536 extra_kwargs = dict(agent_kwargs or {})
537 if active_file:
538 extra_kwargs["current_editor_file"] = active_file
539 result = agent.run(
540 prompt_template=task,
541 work_dir=actual_work_dir,
542 printer=printer,
543 model_name=model_name,
544 attachments=parsed_attachments,
545 **extra_kwargs,
546 )
547 result_text = result or ""
548 done_event = {"type": "task_done"}
549 except (KeyboardInterrupt, _StopRequested):
550 logger.debug("Exception caught", exc_info=True)
551 result_text = "(stopped)"
552 done_event = {"type": "task_stopped"}
553 except Exception as e:
554 logger.debug("Exception caught", exc_info=True)
555 result_text = f"(error: {e})"
556 done_event = {"type": "task_error", "text": str(e)}
557 finally:
558 printer._thread_local.stop_event = None
559 chat_events = printer.stop_recording()
560 _append_task_to_md(task, result_text)
561 with running_lock:
562 if agent_thread is not current_thread:
563 # Stopped externally; stop_agent already broadcast
564 # task_stopped which is captured in chat_events.
565 _set_latest_chat_events(chat_events, task=task)
566 return
567 running = False
568 agent_thread = None
569 # Broadcast AFTER setting running=False so clients can
570 # immediately submit a new task without getting a 409.
571 if done_event: 571 ↛ 580line 571 didn't jump to line 580 because the condition on line 571 was always true
572 chat_events.append(done_event)
573 printer.broadcast(done_event)
574 if done_event.get("type") == "task_done":
575 threading.Thread(
576 target=generate_followup,
577 args=(task, result_text),
578 daemon=True,
579 ).start()
580 _set_latest_chat_events(chat_events, task=task)
581 try:
582 merge_result = _prepare_merge_view(
583 actual_work_dir,
584 cs_data_dir,
585 pre_hunks,
586 pre_untracked,
587 pre_file_hashes,
588 )
589 if merge_result.get("status") == "opened":
590 with running_lock:
591 merging = True
592 printer.broadcast({"type": "merge_started"})
593 except Exception:
594 logger.debug("Exception caught", exc_info=True)
595 refresh_file_cache()
596 try:
597 refresh_proposed_tasks()
598 except Exception:
599 logger.debug("Exception caught", exc_info=True)
601 def stop_agent() -> bool:
602 """Kill the current agent thread and reset state for a new task.
604 Sets the thread's per-thread stop event so the agent stops at
605 the next printer.print() or token_callback() check. Also injects
606 _StopRequested via PyThreadState_SetAsyncExc as a fallback.
607 """
608 nonlocal running, agent_thread, current_stop_event
609 with running_lock:
610 thread = agent_thread
611 if thread is None or not thread.is_alive():
612 return False
613 running = False
614 agent_thread = None
615 # Set the per-thread stop event so only this thread sees
616 # the stop signal. New threads get their own fresh event.
617 stop_ev = current_stop_event
618 current_stop_event = None
619 if stop_ev is not None: 619 ↛ 621line 619 didn't jump to line 621 because the condition on line 619 was always true
620 stop_ev.set()
621 import ctypes
623 tid = thread.ident
624 if tid is not None: 624 ↛ 629line 624 didn't jump to line 629 because the condition on line 624 was always true
625 ctypes.pythonapi.PyThreadState_SetAsyncExc(
626 ctypes.c_ulong(tid),
627 ctypes.py_object(_StopRequested),
628 )
629 printer.broadcast({"type": "task_stopped"})
630 return True
632 # True when this instance created a PID-specific data dir for isolation.
633 _is_isolated = cs_data_dir.endswith(f"-{os.getpid()}")
635 def _cleanup() -> None:
636 nonlocal merging
637 with running_lock:
638 was_merging = merging
639 merging = False
640 if was_merging: 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true
641 _restore_merge_files(cs_data_dir, actual_work_dir)
642 stop_agent()
643 if cs_proc and cs_proc.poll() is None: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true
644 try:
645 os.killpg(cs_proc.pid, 15) # SIGTERM to process group
646 except OSError:
647 cs_proc.terminate()
648 try:
649 cs_proc.wait(timeout=5)
650 except Exception:
651 logger.debug("Exception caught", exc_info=True)
652 try:
653 os.killpg(cs_proc.pid, 9) # SIGKILL to process group
654 except OSError:
655 cs_proc.kill()
656 # Remove PID-specific data dir created for instance isolation.
657 if _is_isolated: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true
658 try:
659 shutil.rmtree(cs_data_dir, ignore_errors=True)
660 except Exception:
661 logger.debug("Exception caught", exc_info=True)
663 def _do_shutdown() -> None:
664 with running_lock:
665 if running or printer.has_clients(): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 return
667 shutting_down.set()
668 _cleanup()
669 server.should_exit = True
671 def _cancel_shutdown() -> None:
672 nonlocal shutdown_timer
673 with shutdown_lock:
674 if shutdown_timer is not None: 674 ↛ exitline 674 didn't jump to the function exit
675 shutdown_timer.cancel()
676 shutdown_timer = None
678 def _schedule_shutdown() -> None:
679 nonlocal shutdown_timer
680 if printer.has_clients(): 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true
681 return
682 with running_lock:
683 if running: 683 ↛ 684line 683 didn't jump to line 684 because the condition on line 683 was never true
684 return
685 with shutdown_lock:
686 if shutdown_timer is not None:
687 shutdown_timer.cancel()
688 shutdown_timer = threading.Timer(10.0, _do_shutdown)
689 shutdown_timer.daemon = True
690 shutdown_timer.start()
692 async def index(request: Request) -> HTMLResponse:
693 return HTMLResponse(html_page)
695 async def events(request: Request) -> StreamingResponse:
696 cq = printer.add_client()
697 _cancel_shutdown()
699 async def generate() -> AsyncGenerator[str]:
700 last_heartbeat = time.monotonic()
701 disconnect_check_counter = 0
702 try:
703 while not shutting_down.is_set(): 703 ↛ 723line 703 didn't jump to line 723 because the condition on line 703 was always true
704 disconnect_check_counter += 1
705 if disconnect_check_counter >= 20:
706 disconnect_check_counter = 0
707 if await request.is_disconnected(): 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true
708 break
709 try:
710 event = cq.get_nowait()
711 except queue.Empty:
712 now = time.monotonic()
713 if now - last_heartbeat >= 5.0:
714 yield ": heartbeat\n\n"
715 last_heartbeat = now
716 await asyncio.sleep(0.05)
717 continue
718 yield f"data: {json.dumps(event)}\n\n"
719 last_heartbeat = time.monotonic()
720 except asyncio.CancelledError:
721 logger.debug("Exception caught", exc_info=True)
722 finally:
723 printer.remove_client(cq)
724 _schedule_shutdown()
726 return StreamingResponse(
727 generate(),
728 media_type="text/event-stream",
729 headers={
730 "Cache-Control": "no-cache",
731 "X-Accel-Buffering": "no",
732 "Connection": "keep-alive",
733 },
734 )
736 async def run_task(request: Request) -> JSONResponse:
737 nonlocal running, agent_thread, selected_model, current_stop_event
738 body = await request.json()
739 task = body.get("task", "").strip()
740 model = body.get("model", "").strip() or selected_model
741 attachments = body.get("attachments")
742 selected_model = model
743 if not task:
744 return JSONResponse({"error": "Empty task"}, status_code=400)
745 _record_model_usage(model)
746 if model not in _INTERNAL_MODELS:
747 _record_model_usage(model)
748 stop_ev = threading.Event()
749 t = threading.Thread(
750 target=run_agent_thread,
751 args=(task, model, stop_ev, attachments),
752 daemon=True,
753 )
754 with running_lock:
755 if merging:
756 return JSONResponse(
757 {"error": "Resolve all diffs in the merge view first"},
758 status_code=409,
759 )
760 if running:
761 return JSONResponse({"error": "Agent is already running"}, status_code=409)
762 current_stop_event = stop_ev
763 running = True
764 agent_thread = t
765 t.start()
766 return JSONResponse({"status": "started"})
768 async def run_selection(request: Request) -> JSONResponse:
769 """Run the agent on text selected in the VS Code editor.
771 Broadcasts an ``external_run`` event so the chatbox UI enters
772 running state and displays the selected text as a user message,
773 then starts the agent thread with the selected text as the task.
774 """
775 nonlocal running, agent_thread, current_stop_event
776 body = await request.json()
777 text = body.get("text", "").strip()
778 if not text:
779 return JSONResponse({"error": "No text selected"}, status_code=400)
780 model = selected_model
781 _record_model_usage(model)
782 if model not in _INTERNAL_MODELS:
783 _record_model_usage(model)
784 stop_ev = threading.Event()
785 t = threading.Thread(
786 target=run_agent_thread,
787 args=(text, model, stop_ev, None),
788 daemon=True,
789 )
790 with running_lock:
791 if merging:
792 return JSONResponse(
793 {"error": "Resolve all diffs in the merge view first"},
794 status_code=409,
795 )
796 if running:
797 return JSONResponse({"error": "Agent is already running"}, status_code=409)
798 current_stop_event = stop_ev
799 running = True
800 agent_thread = t
801 # Broadcast AFTER lock confirms task will start, so clients
802 # never see external_run for a task that returns 409.
803 printer.broadcast({"type": "external_run", "text": text})
804 t.start()
805 return JSONResponse({"status": "started"})
807 async def stop_task(request: Request) -> JSONResponse:
808 if stop_agent():
809 return JSONResponse({"status": "stopping"})
810 return JSONResponse({"error": "No running task"}, status_code=404)
812 async def suggestions(request: Request) -> JSONResponse:
813 query = request.query_params.get("q", "").strip()
814 mode = request.query_params.get("mode", "general")
815 if mode == "files":
816 q = query.lower()
817 usage = _load_file_usage()
818 frequent: list[dict[str, str]] = []
819 rest: list[dict[str, str]] = []
820 for path in file_cache:
821 if not q or q in path.lower():
822 ptype = "dir" if path.endswith("/") else "file"
823 item = {"type": ptype, "text": path}
824 if usage.get(path, 0) > 0:
825 frequent.append(item)
826 else:
827 rest.append(item)
828 frequent.sort(key=lambda m: (m["type"] != "file", -usage.get(m["text"], 0)))
829 rest.sort(key=lambda m: m["type"] != "file")
830 for f in frequent:
831 f["type"] = "frequent_" + f["type"]
832 return JSONResponse((frequent + rest)[:20])
833 if not query:
834 return JSONResponse([])
835 q_lower = query.lower()
836 results = []
837 for entry in _load_history():
838 task = str(entry["task"])
839 if q_lower in task.lower():
840 results.append({"type": "task", "text": task})
841 if len(results) >= 5:
842 break
843 with proposed_lock:
844 for t in proposed_tasks:
845 if q_lower in t.lower():
846 results.append({"type": "suggested", "text": t})
847 words = query.split()
848 last_word = words[-1].lower() if words else q_lower
849 if last_word and len(last_word) >= 2:
850 count = 0
851 for path in file_cache:
852 if last_word in path.lower():
853 results.append({"type": "file", "text": path})
854 count += 1
855 if count >= 8:
856 break
857 return JSONResponse(results)
859 async def tasks(request: Request) -> JSONResponse:
860 history = _load_history()
861 return JSONResponse(
862 [
863 {"task": e["task"], "has_events": bool(e.get("chat_events"))}
864 for e in history
865 ]
866 )
868 async def task_events(request: Request) -> JSONResponse:
869 """Return chat events for a specific task by index."""
870 try:
871 idx = int(request.query_params.get("idx", "0"))
872 except (ValueError, TypeError):
873 return JSONResponse({"error": "Invalid index"}, status_code=400)
874 history = _load_history()
875 if idx < 0 or idx >= len(history):
876 return JSONResponse({"error": "Index out of range"}, status_code=404)
877 events: list[dict[str, object]] = history[idx].get("chat_events", []) # type: ignore[assignment]
878 return JSONResponse(events)
880 async def proposed_tasks_endpoint(request: Request) -> JSONResponse:
881 with proposed_lock:
882 tasks_list = list(proposed_tasks)
883 if not tasks_list: 883 ↛ 884line 883 didn't jump to line 884 because the condition on line 883 was never true
884 tasks_list = [str(t["task"]) for t in SAMPLE_TASKS[:5]]
885 return JSONResponse(tasks_list)
887 def _fast_complete(raw_query: str, query: str) -> str:
888 query_lower = query.lower()
889 for entry in _load_history():
890 task = str(entry.get("task", ""))
891 if task.lower().startswith(query_lower) and len(task) > len(query):
892 return task[len(query):]
893 words = raw_query.split()
894 last_word = words[-1] if words else ""
895 if last_word and len(last_word) >= 2:
896 lw_lower = last_word.lower()
897 for path in file_cache:
898 if path.lower().startswith(lw_lower) and len(path) > len(last_word):
899 return path[len(last_word) :]
900 return ""
902 async def complete(request: Request) -> JSONResponse:
903 raw_query = request.query_params.get("q", "")
904 query = raw_query.strip()
905 if not query or len(query) < 2:
906 return JSONResponse({"suggestion": ""})
908 fast = _fast_complete(raw_query, query)
909 if fast:
910 return JSONResponse({"suggestion": fast})
912 def _generate() -> str:
913 history = _load_history()
914 task_list = "\n".join(f"- {e['task']}" for e in history[:20])
915 agent = KISSAgent("Autocomplete")
916 try:
917 result = agent.run(
918 model_name=_FAST_MODEL,
919 prompt_template=(
920 "You are an inline autocomplete engine for a coding assistant. "
921 "Given the user's partial input and their past task history, "
922 "predict what they want to type and return ONLY the remaining "
923 "text to complete their input. Do NOT repeat the text they already typed. "
924 "Keep the completion concise and natural."
925 "If no good completion, return empty string.\n\n"
926 "Past tasks:\n{task_list}\n\n"
927 'Partial input: "{query}"\n\n'
928 ),
929 arguments={"task_list": task_list, "query": query},
930 is_agentic=False,
931 )
932 s = _clean_llm_output(result)
933 if s.lower().startswith(query.lower()): 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true
934 s = s[len(query) :]
935 return s
936 except Exception:
937 logger.debug("Exception caught", exc_info=True)
938 return ""
940 suggestion = await asyncio.to_thread(_generate)
941 return JSONResponse({"suggestion": suggestion})
943 async def models_endpoint(request: Request) -> JSONResponse:
944 usage = _load_model_usage()
945 models_list: list[dict[str, Any]] = []
946 for name in get_available_models():
947 info = MODEL_INFO.get(name)
948 if info and info.is_function_calling_supported:
949 models_list.append(
950 {
951 "name": name,
952 "inp": info.input_price_per_1M,
953 "out": info.output_price_per_1M,
954 "uses": usage.get(name, 0),
955 }
956 )
957 models_list.sort(
958 key=lambda m: (
959 _model_vendor_order(str(m["name"])),
960 -(float(m["inp"]) + float(m["out"])),
961 )
962 )
963 return JSONResponse({"models": models_list, "selected": selected_model})
965 async def closing(request: Request) -> JSONResponse:
966 """Handle browser tab/window closing. Schedule a quick shutdown."""
967 _schedule_shutdown()
968 return JSONResponse({"status": "ok"})
970 async def focus_chatbox(request: Request) -> JSONResponse:
971 printer.broadcast({"type": "focus_chatbox"})
972 return JSONResponse({"status": "ok"})
974 async def focus_editor(request: Request) -> JSONResponse:
975 pending = os.path.join(cs_data_dir, "pending-focus-editor.json")
976 with open(pending, "w") as f:
977 json.dump({"focus": True}, f)
978 return JSONResponse({"status": "ok"})
980 async def theme(request: Request) -> JSONResponse:
981 theme_file = _KISS_DIR / "vscode-theme.json"
982 kind = "dark"
983 if theme_file.exists():
984 try:
985 data = json.loads(theme_file.read_text())
986 kind = data.get("kind", "dark")
987 except (json.JSONDecodeError, OSError):
988 logger.debug("Exception caught", exc_info=True)
989 return JSONResponse(_THEME_PRESETS.get(kind, _THEME_PRESETS["dark"]))
991 async def open_file(request: Request) -> JSONResponse:
992 body = await request.json()
993 rel = body.get("path", "").strip()
994 if not rel:
995 return JSONResponse({"error": "No path"}, status_code=400)
996 full = rel if rel.startswith("/") else os.path.join(actual_work_dir, rel)
997 if not os.path.isfile(full):
998 return JSONResponse({"error": "File not found"}, status_code=404)
999 pending = os.path.join(cs_data_dir, "pending-open.json")
1000 with open(pending, "w") as f:
1001 json.dump({"path": full}, f)
1002 return JSONResponse({"status": "ok"})
1004 async def merge_action(request: Request) -> JSONResponse:
1005 nonlocal merging
1006 body = await request.json()
1007 action = body.get("action", "")
1008 if action == "all-done":
1009 with running_lock:
1010 merging = False
1011 printer.broadcast({"type": "merge_ended"})
1012 _cleanup_merge_data(cs_data_dir)
1013 return JSONResponse({"status": "ok"})
1014 if action not in ("prev", "next", "accept-all", "reject-all", "accept", "reject"):
1015 return JSONResponse({"error": "Invalid action"}, status_code=400)
1016 pending = os.path.join(cs_data_dir, "pending-action.json")
1017 with open(pending, "w") as f:
1018 json.dump({"action": action}, f)
1019 return JSONResponse({"status": "ok"})
1021 async def _thread_json_response(
1022 fn: Callable[[], dict[str, str]],
1023 error_status: int = 400,
1024 ) -> JSONResponse:
1025 result = await asyncio.to_thread(fn)
1026 if "error" in result:
1027 return JSONResponse(result, status_code=error_status)
1028 return JSONResponse(result)
1029 async def commit(request: Request) -> JSONResponse:
1030 def _do_commit() -> dict[str, str]:
1031 try:
1032 subprocess.run(["git", "add", "-A"], cwd=actual_work_dir)
1033 diff_stat = subprocess.run(
1034 ["git", "diff", "--cached", "--stat"],
1035 capture_output=True,
1036 text=True,
1037 cwd=actual_work_dir,
1038 )
1039 if not diff_stat.stdout.strip():
1040 return {"error": "No changes to commit"}
1041 diff_detail = subprocess.run(
1042 ["git", "diff", "--cached"],
1043 capture_output=True,
1044 text=True,
1045 cwd=actual_work_dir,
1046 )
1047 message = _generate_commit_msg(diff_detail.stdout)
1048 commit_env = {
1049 **os.environ,
1050 "GIT_COMMITTER_NAME": "KISS Sorcar",
1051 "GIT_COMMITTER_EMAIL": "ksen@berkeley.edu",
1052 }
1053 result = subprocess.run(
1054 ["git", "commit", "-m", message, "--author=KISS Sorcar <ksen@berkeley.edu>"],
1055 capture_output=True,
1056 text=True,
1057 cwd=actual_work_dir,
1058 env=commit_env,
1059 )
1060 if result.returncode != 0:
1061 return {"error": result.stderr.strip()}
1062 return {"status": "ok", "message": message}
1063 except Exception as e:
1064 logger.debug("Exception caught", exc_info=True)
1065 return {"error": str(e)}
1067 return await _thread_json_response(_do_commit)
1069 async def push(request: Request) -> JSONResponse:
1070 def _do_push() -> dict[str, str]:
1071 try:
1072 result = subprocess.run(
1073 ["git", "push"],
1074 capture_output=True,
1075 text=True,
1076 cwd=actual_work_dir,
1077 )
1078 if result.returncode != 0: 1078 ↛ 1080line 1078 didn't jump to line 1080 because the condition on line 1078 was always true
1079 return {"error": result.stderr.strip() or "Push failed"}
1080 return {"status": "ok"}
1081 except Exception as e:
1082 logger.debug("Exception caught", exc_info=True)
1083 return {"error": str(e)}
1085 return await _thread_json_response(_do_push)
1088 async def record_file_usage_endpoint(
1089 request: Request,
1090 ) -> JSONResponse:
1091 body = await request.json()
1092 path = body.get("path", "").strip()
1093 if path:
1094 _record_file_usage(path)
1095 return JSONResponse({"status": "ok"})
1097 async def generate_commit_message(request: Request) -> JSONResponse:
1098 """Generate a git commit message from current diff and fill the SCM input."""
1100 def _generate() -> dict[str, str]:
1101 try:
1102 diff_result = subprocess.run(
1103 ["git", "diff"],
1104 capture_output=True,
1105 text=True,
1106 cwd=actual_work_dir,
1107 )
1108 cached_result = subprocess.run(
1109 ["git", "diff", "--cached"],
1110 capture_output=True,
1111 text=True,
1112 cwd=actual_work_dir,
1113 )
1114 diff_text = (diff_result.stdout + cached_result.stdout).strip()
1115 untracked_files = "\n".join(sorted(_capture_untracked(actual_work_dir)))
1116 if not diff_text and not untracked_files:
1117 return {"error": "No changes detected"}
1118 context_parts = []
1119 if diff_text: 1119 ↛ 1121line 1119 didn't jump to line 1121 because the condition on line 1119 was always true
1120 context_parts.append(f"Diff:\n{diff_text[:4000]}")
1121 if untracked_files:
1122 context_parts.append(f"New untracked files:\n{untracked_files[:500]}")
1123 msg = _generate_commit_msg("\n\n".join(context_parts), detailed=True)
1124 scm_pending = os.path.join(cs_data_dir, "pending-scm-message.json")
1125 with open(scm_pending, "w") as f:
1126 json.dump({"message": msg}, f)
1127 return {"message": msg}
1128 except Exception as e:
1129 logger.debug("Exception caught", exc_info=True)
1130 return {"error": str(e)}
1132 return await _thread_json_response(_generate)
1134 async def active_file_info(request: Request) -> JSONResponse:
1135 """Check if the current editor file is a runnable prompt."""
1136 fpath = _read_active_file(cs_data_dir)
1137 if not fpath or not fpath.lower().endswith(".md"):
1138 return JSONResponse({"is_prompt": False, "path": fpath})
1139 from kiss.agents.sorcar.prompt_detector import PromptDetector
1141 detector = PromptDetector()
1142 is_prompt, _score, _reasons = detector.analyze(fpath)
1143 return JSONResponse(
1144 {
1145 "is_prompt": is_prompt,
1146 "path": fpath,
1147 "filename": os.path.basename(fpath),
1148 }
1149 )
1151 async def get_file_content(request: Request) -> JSONResponse:
1152 """Return the text content of a file."""
1153 fpath = request.query_params.get("path", "").strip()
1154 if not fpath or not os.path.isfile(fpath):
1155 return JSONResponse({"error": "File not found"}, status_code=404)
1156 try:
1157 with open(fpath, encoding="utf-8") as f:
1158 content = f.read()
1159 return JSONResponse({"content": content})
1160 except Exception as e:
1161 logger.debug("Exception caught", exc_info=True)
1162 return JSONResponse({"error": str(e)}, status_code=500)
1164 async def generate_config_message(request: Request) -> JSONResponse:
1165 body = await request.json()
1166 model = body.get("model", selected_model)
1168 def _generate() -> dict[str, str]:
1169 cfg = config_module.DEFAULT_CONFIG
1170 history = _load_history()
1171 info_parts = [
1172 f"Work directory: {actual_work_dir}",
1173 f"Selected model: {model}",
1174 f"Max steps: {cfg.sorcar.sorcar_agent.max_steps}",
1175 f"Max budget: ${cfg.sorcar.sorcar_agent.max_budget:.2f}",
1176 f"Global max budget: ${cfg.agent.global_max_budget:.2f}",
1177 f"Headless browser: {cfg.sorcar.sorcar_agent.headless}",
1178 f"Code-server: {'running' if code_server_url else 'not available'}",
1179 f"Tasks completed: {len(history)}",
1180 ]
1181 recent = [str(e["task"]) for e in history[:5]] if history else []
1182 if recent: 1182 ↛ 1184line 1182 didn't jump to line 1184 because the condition on line 1182 was always true
1183 info_parts.append("Recent tasks: " + "; ".join(recent))
1184 config_info = "\n".join(info_parts)
1186 agent = KISSAgent("Config Message Generator")
1187 try:
1188 result = agent.run(
1189 model_name=_FAST_MODEL,
1190 prompt_template=(
1191 "You are a helpful assistant. Given the following configuration "
1192 "information about a coding assistant environment, generate a "
1193 "short, nicely formatted, informative status message that a "
1194 "developer would find useful. Include key details like the "
1195 "model, work directory, budget, and recent activity. "
1196 "Keep it concise (3-5 lines) and use emoji for visual appeal. "
1197 "Return ONLY the message text, no quotes or markdown fences.\n\n"
1198 "Configuration:\n{config_info}"
1199 ),
1200 arguments={"config_info": config_info},
1201 is_agentic=False,
1202 )
1203 return {"message": result.strip()}
1204 except Exception as e:
1205 logger.debug("Exception caught", exc_info=True)
1206 return {"error": str(e)}
1208 return await _thread_json_response(_generate, error_status=500)
1210 app = Starlette(
1211 routes=[
1212 Route("/", index),
1213 Route("/events", events),
1214 Route("/run", run_task, methods=["POST"]),
1215 Route("/run-selection", run_selection, methods=["POST"]),
1216 Route("/stop", stop_task, methods=["POST"]),
1217 Route("/open-file", open_file, methods=["POST"]),
1218 Route("/closing", closing, methods=["POST"]),
1219 Route("/focus-chatbox", focus_chatbox, methods=["POST"]),
1220 Route("/focus-editor", focus_editor, methods=["POST"]),
1221 Route("/commit", commit, methods=["POST"]),
1222 Route("/push", push, methods=["POST"]),
1223 Route("/merge-action", merge_action, methods=["POST"]),
1224 Route("/record-file-usage", record_file_usage_endpoint, methods=["POST"]),
1225 Route("/generate-commit-message", generate_commit_message, methods=["POST"]),
1226 Route("/generate-config-message", generate_config_message, methods=["POST"]),
1227 Route("/active-file-info", active_file_info),
1228 Route("/get-file-content", get_file_content),
1229 Route("/suggestions", suggestions),
1230 Route("/complete", complete),
1231 Route("/tasks", tasks),
1232 Route("/task-events", task_events),
1233 Route("/proposed_tasks", proposed_tasks_endpoint),
1234 Route("/models", models_endpoint),
1235 Route("/theme", theme),
1236 ]
1237 )
1239 threading.Thread(target=refresh_proposed_tasks, daemon=True).start()
1241 import atexit
1243 atexit.register(_cleanup)
1245 port = find_free_port()
1246 try:
1247 Path(cs_data_dir).mkdir(parents=True, exist_ok=True)
1248 (Path(cs_data_dir) / "assistant-port").write_text(str(port))
1249 except OSError:
1250 logger.debug("Exception caught", exc_info=True)
1251 url = f"http://127.0.0.1:{port}"
1252 print(f"{title} running at {url}", flush=True)
1253 print(f"Work directory: {actual_work_dir}", flush=True)
1254 printer.print(f"{title} running at {url}")
1255 printer.print(f"Work directory: {actual_work_dir}")
1257 def _open_browser() -> None:
1258 time.sleep(2)
1259 try:
1260 if not webbrowser.open(url): 1260 ↛ exitline 1260 didn't return from function '_open_browser' because the condition on line 1260 was always true
1261 logger.warning("webbrowser.open() returned False for %s", url)
1262 except Exception:
1263 logger.warning("Failed to open browser", exc_info=True)
1264 if sys.platform == "darwin":
1265 try:
1266 subprocess.Popen(
1267 ["open", url],
1268 stdout=subprocess.DEVNULL,
1269 stderr=subprocess.DEVNULL,
1270 )
1271 except Exception:
1272 logger.warning("Fallback 'open' command also failed", exc_info=True)
1274 threading.Thread(target=_open_browser, daemon=True).start()
1275 logging.getLogger("uvicorn.error").setLevel(logging.CRITICAL)
1276 config = uvicorn.Config(
1277 app,
1278 host="127.0.0.1",
1279 port=port,
1280 log_level="warning",
1281 timeout_graceful_shutdown=1,
1282 )
1283 server = uvicorn.Server(config)
1284 _orig_handle_exit = server.handle_exit
1286 def _on_exit(sig: int, frame: types.FrameType | None) -> None:
1287 shutting_down.set()
1288 _orig_handle_exit(sig, frame)
1290 server.handle_exit = _on_exit # type: ignore[method-assign]
1291 try:
1292 server.run()
1293 except KeyboardInterrupt:
1294 logger.debug("Exception caught", exc_info=True)
1295 _cleanup()
1298def main() -> None:
1299 """Launch the KISS chatbot UI in assistant or coding mode based on KISS_MODE env var."""
1300 import argparse
1302 from kiss._version import __version__
1303 from kiss.agents.sorcar.sorcar_agent import SorcarAgent
1305 parser = argparse.ArgumentParser(description="KISS Assistant")
1306 parser.add_argument(
1307 "work_dir",
1308 nargs="?",
1309 default=os.getcwd(),
1310 help="Working directory for the agent",
1311 )
1312 parser.add_argument(
1313 "--model_name",
1314 default="claude-opus-4-6",
1315 help="Default LLM model name",
1316 )
1317 args = parser.parse_args()
1318 work_dir = str(Path(args.work_dir).resolve())
1320 is_assistant = os.environ.get("KISS_MODE", "assistant").lower() == "assistant"
1321 run_chatbot(
1322 agent_factory=SorcarAgent,
1323 title=f"KISS {'Assistant' if is_assistant else 'Coding Assistant'}: {__version__}",
1324 work_dir=work_dir,
1325 default_model=args.model_name,
1326 agent_kwargs={"headless": not is_assistant},
1327 )
1330if __name__ == "__main__":
1331 main()