- README.MD: add original-source-code and nano-claude-code sections, update overview table (4 subprojects), add v3.0 news entry, expand comparison table with memory/multi-agent/skills dimensions - nano-claude-code v3.0: multi-agent package (multi_agent/), memory package (memory/), skill package (skill/) with built-in /commit and /review skills, context compression (compaction.py), tool registry plugin system, diff view, 17 slash commands, 18 built-in tools, 101 tests (~5000 lines total) - original-source-code/src: add raw TypeScript source tree (1884 files) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
749 lines
28 KiB
Python
Executable File
749 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Nano Claude Code — Minimal Python implementation of Claude Code.
|
||
|
||
Usage:
|
||
python nano_claude.py [options] [prompt]
|
||
|
||
Options:
|
||
-p, --print Non-interactive: run prompt and exit (also --print-output)
|
||
-m, --model MODEL Override model
|
||
--accept-all Never ask permission (dangerous)
|
||
--verbose Show thinking + token counts
|
||
--version Print version and exit
|
||
|
||
Slash commands in REPL:
|
||
/help Show this help
|
||
/clear Clear conversation
|
||
/model [m] Show or set model
|
||
/config Show config / set key=value
|
||
/save [f] Save session to file
|
||
/load [f] Load session from file
|
||
/history Print conversation history
|
||
/context Show context window usage
|
||
/cost Show API cost this session
|
||
/verbose Toggle verbose mode
|
||
/thinking Toggle extended thinking
|
||
/permissions [mode] Set permission mode
|
||
/cwd [path] Show or change working directory
|
||
/memory [query] Show/search persistent memories
|
||
/skills List available skills
|
||
/agents Show sub-agent tasks
|
||
/exit /quit Exit
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
try:
|
||
import readline
|
||
except ImportError:
|
||
readline = None # Windows compatibility
|
||
import atexit
|
||
import argparse
|
||
import textwrap
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Optional, Union
|
||
|
||
# ── Optional rich for markdown rendering ──────────────────────────────────
|
||
try:
|
||
from rich.console import Console
|
||
from rich.markdown import Markdown
|
||
from rich.syntax import Syntax
|
||
from rich.panel import Panel
|
||
from rich import print as rprint
|
||
_RICH = True
|
||
console = Console()
|
||
except ImportError:
|
||
_RICH = False
|
||
console = None
|
||
|
||
VERSION = "1.0.0"
|
||
|
||
# ── ANSI helpers (used even with rich for non-markdown output) ─────────────
|
||
C = {
|
||
"cyan": "\033[36m",
|
||
"green": "\033[32m",
|
||
"yellow": "\033[33m",
|
||
"red": "\033[31m",
|
||
"blue": "\033[34m",
|
||
"magenta": "\033[35m",
|
||
"bold": "\033[1m",
|
||
"dim": "\033[2m",
|
||
"reset": "\033[0m",
|
||
}
|
||
|
||
def clr(text: str, *keys: str) -> str:
|
||
return "".join(C[k] for k in keys) + str(text) + C["reset"]
|
||
|
||
def info(msg: str): print(clr(msg, "cyan"))
|
||
def ok(msg: str): print(clr(msg, "green"))
|
||
def warn(msg: str): print(clr(f"Warning: {msg}", "yellow"))
|
||
def err(msg: str): print(clr(f"Error: {msg}", "red"), file=sys.stderr)
|
||
|
||
|
||
def render_diff(text: str):
|
||
"""Print diff text with ANSI colors: red for removals, green for additions."""
|
||
for line in text.splitlines():
|
||
if line.startswith("+++") or line.startswith("---"):
|
||
print(C["bold"] + line + C["reset"])
|
||
elif line.startswith("+"):
|
||
print(C["green"] + line + C["reset"])
|
||
elif line.startswith("-"):
|
||
print(C["red"] + line + C["reset"])
|
||
elif line.startswith("@@"):
|
||
print(C["cyan"] + line + C["reset"])
|
||
else:
|
||
print(line)
|
||
|
||
def _has_diff(text: str) -> bool:
|
||
"""Check if text contains a unified diff."""
|
||
return "--- a/" in text and "+++ b/" in text
|
||
|
||
|
||
# ── Conversation rendering ─────────────────────────────────────────────────
|
||
|
||
_accumulated_text: list[str] = [] # buffer text during streaming
|
||
|
||
def stream_text(chunk: str):
|
||
"""Called for each streamed text chunk."""
|
||
print(chunk, end="", flush=True)
|
||
_accumulated_text.append(chunk)
|
||
|
||
def stream_thinking(chunk: str, verbose: bool):
|
||
if verbose:
|
||
print(clr(chunk, "dim"), end="", flush=True)
|
||
|
||
def flush_response():
|
||
"""After streaming, optionally re-render as markdown."""
|
||
full = "".join(_accumulated_text)
|
||
_accumulated_text.clear()
|
||
if _RICH and full.strip():
|
||
# Re-print with markdown rendering
|
||
print("\r", end="") # go to line start to overwrite last newline
|
||
# only re-render if there's actual markdown (contains # * ` _ etc.)
|
||
if any(c in full for c in ("#", "*", "`", "_", "[")):
|
||
print() # newline after streaming
|
||
console.print(Markdown(full))
|
||
return
|
||
print() # ensure newline after stream
|
||
|
||
def print_tool_start(name: str, inputs: dict, verbose: bool):
|
||
"""Show tool invocation."""
|
||
desc = _tool_desc(name, inputs)
|
||
print(clr(f"\n ⚙ {desc}", "dim", "cyan"), flush=True)
|
||
if verbose:
|
||
print(clr(f" inputs: {json.dumps(inputs, ensure_ascii=False)[:200]}", "dim"))
|
||
|
||
def print_tool_end(name: str, result: str, verbose: bool):
|
||
lines = result.count("\n") + 1
|
||
size = len(result)
|
||
summary = f"→ {lines} lines ({size} chars)"
|
||
if not result.startswith("Error") and not result.startswith("Denied"):
|
||
print(clr(f" ✓ {summary}", "dim", "green"), flush=True)
|
||
# Render diff for Edit/Write results
|
||
if name in ("Edit", "Write") and _has_diff(result):
|
||
parts = result.split("\n\n", 1)
|
||
if len(parts) == 2:
|
||
print(clr(f" {parts[0]}", "dim"))
|
||
render_diff(parts[1])
|
||
else:
|
||
print(clr(f" ✗ {result[:120]}", "dim", "red"), flush=True)
|
||
if verbose and not result.startswith("Denied"):
|
||
preview = result[:500] + ("…" if len(result) > 500 else "")
|
||
print(clr(f" {preview.replace(chr(10), chr(10)+' ')}", "dim"))
|
||
|
||
def _tool_desc(name: str, inputs: dict) -> str:
|
||
if name == "Read": return f"Read({inputs.get('file_path','')})"
|
||
if name == "Write": return f"Write({inputs.get('file_path','')})"
|
||
if name == "Edit": return f"Edit({inputs.get('file_path','')})"
|
||
if name == "Bash": return f"Bash({inputs.get('command','')[:80]})"
|
||
if name == "Glob": return f"Glob({inputs.get('pattern','')})"
|
||
if name == "Grep": return f"Grep({inputs.get('pattern','')})"
|
||
if name == "WebFetch": return f"WebFetch({inputs.get('url','')[:60]})"
|
||
if name == "WebSearch": return f"WebSearch({inputs.get('query','')})"
|
||
if name == "Agent":
|
||
atype = inputs.get("subagent_type", "")
|
||
aname = inputs.get("name", "")
|
||
iso = inputs.get("isolation", "")
|
||
bg = not inputs.get("wait", True)
|
||
parts = []
|
||
if atype: parts.append(atype)
|
||
if aname: parts.append(f"name={aname}")
|
||
if iso: parts.append(f"isolation={iso}")
|
||
if bg: parts.append("background")
|
||
suffix = f"({', '.join(parts)})" if parts else ""
|
||
prompt_short = inputs.get("prompt", "")[:60]
|
||
return f"Agent{suffix}: {prompt_short}"
|
||
if name == "SendMessage":
|
||
return f"SendMessage(to={inputs.get('to','')}: {inputs.get('message','')[:50]})"
|
||
if name == "CheckAgentResult": return f"CheckAgentResult({inputs.get('task_id','')})"
|
||
if name == "ListAgentTasks": return "ListAgentTasks()"
|
||
if name == "ListAgentTypes": return "ListAgentTypes()"
|
||
return f"{name}({list(inputs.values())[:1]})"
|
||
|
||
|
||
# ── Permission prompt ──────────────────────────────────────────────────────
|
||
|
||
def ask_permission_interactive(desc: str, config: dict) -> bool:
|
||
try:
|
||
print()
|
||
ans = input(clr(f" Allow: {desc} [y/N/a(ccept-all)] ", "yellow")).strip().lower()
|
||
if ans == "a":
|
||
config["permission_mode"] = "accept-all"
|
||
ok(" Permission mode set to accept-all for this session.")
|
||
return True
|
||
return ans in ("y", "yes")
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return False
|
||
|
||
|
||
# ── Slash commands ─────────────────────────────────────────────────────────
|
||
|
||
def cmd_help(_args: str, _state, _config) -> bool:
|
||
print(__doc__)
|
||
return True
|
||
|
||
def cmd_clear(_args: str, state, _config) -> bool:
|
||
state.messages.clear()
|
||
state.turn_count = 0
|
||
ok("Conversation cleared.")
|
||
return True
|
||
|
||
def cmd_model(args: str, _state, config) -> bool:
|
||
from providers import PROVIDERS, detect_provider
|
||
if not args:
|
||
model = config["model"]
|
||
pname = detect_provider(model)
|
||
info(f"Current model: {model} (provider: {pname})")
|
||
info("\nAvailable models by provider:")
|
||
for pn, pdata in PROVIDERS.items():
|
||
ms = pdata.get("models", [])
|
||
if ms:
|
||
info(f" {pn:12s} " + ", ".join(ms[:4]) + ("..." if len(ms) > 4 else ""))
|
||
info("\nFormat: 'provider/model' or just model name (auto-detected)")
|
||
info(" e.g. /model gpt-4o")
|
||
info(" e.g. /model ollama/qwen2.5-coder")
|
||
info(" e.g. /model kimi:moonshot-v1-32k")
|
||
else:
|
||
# Accept both "ollama/model" and "ollama:model" syntax
|
||
m = args.strip().replace(":", "/", 1)
|
||
config["model"] = m
|
||
pname = detect_provider(m)
|
||
ok(f"Model set to {m} (provider: {pname})")
|
||
from config import save_config
|
||
save_config(config)
|
||
return True
|
||
|
||
def cmd_config(args: str, _state, config) -> bool:
|
||
from config import save_config
|
||
if not args:
|
||
display = {k: v for k, v in config.items() if k != "api_key"}
|
||
print(json.dumps(display, indent=2))
|
||
elif "=" in args:
|
||
key, _, val = args.partition("=")
|
||
key, val = key.strip(), val.strip()
|
||
# Type coercion
|
||
if val.lower() in ("true", "false"):
|
||
val = val.lower() == "true"
|
||
elif val.isdigit():
|
||
val = int(val)
|
||
config[key] = val
|
||
save_config(config)
|
||
ok(f"Set {key} = {val}")
|
||
else:
|
||
k = args.strip()
|
||
v = config.get(k, "(not set)")
|
||
info(f"{k} = {v}")
|
||
return True
|
||
|
||
def cmd_save(args: str, state, _config) -> bool:
|
||
from config import SESSIONS_DIR
|
||
fname = args.strip() or f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||
path = Path(fname) if "/" in fname else SESSIONS_DIR / fname
|
||
data = {
|
||
"messages": [
|
||
m if not isinstance(m.get("content"), list) else
|
||
{**m, "content": [
|
||
b if isinstance(b, dict) else b.model_dump()
|
||
for b in m["content"]
|
||
]}
|
||
for m in state.messages
|
||
],
|
||
"turn_count": state.turn_count,
|
||
"total_input_tokens": state.total_input_tokens,
|
||
"total_output_tokens": state.total_output_tokens,
|
||
}
|
||
path.write_text(json.dumps(data, indent=2, default=str))
|
||
ok(f"Session saved to {path}")
|
||
return True
|
||
|
||
def cmd_load(args: str, state, _config) -> bool:
|
||
from config import SESSIONS_DIR
|
||
if not args.strip():
|
||
# List available sessions
|
||
sessions = sorted(SESSIONS_DIR.glob("*.json"))
|
||
if not sessions:
|
||
info("No saved sessions found.")
|
||
else:
|
||
info("Saved sessions:")
|
||
for s in sessions:
|
||
info(f" {s.name}")
|
||
return True
|
||
fname = args.strip()
|
||
path = Path(fname) if "/" in fname else SESSIONS_DIR / fname
|
||
if not path.exists():
|
||
err(f"File not found: {path}")
|
||
return True
|
||
data = json.loads(path.read_text())
|
||
state.messages = data.get("messages", [])
|
||
state.turn_count = data.get("turn_count", 0)
|
||
state.total_input_tokens = data.get("total_input_tokens", 0)
|
||
state.total_output_tokens = data.get("total_output_tokens", 0)
|
||
ok(f"Session loaded from {path} ({len(state.messages)} messages)")
|
||
return True
|
||
|
||
def cmd_history(_args: str, state, _config) -> bool:
|
||
if not state.messages:
|
||
info("(empty conversation)")
|
||
return True
|
||
for i, m in enumerate(state.messages):
|
||
role = clr(m["role"].upper(), "bold",
|
||
"cyan" if m["role"] == "user" else "green")
|
||
content = m["content"]
|
||
if isinstance(content, str):
|
||
print(f"[{i}] {role}: {content[:200]}")
|
||
elif isinstance(content, list):
|
||
for block in content:
|
||
if isinstance(block, dict):
|
||
btype = block.get("type", "")
|
||
else:
|
||
btype = getattr(block, "type", "")
|
||
if btype == "text":
|
||
text = block.get("text", "") if isinstance(block, dict) else block.text
|
||
print(f"[{i}] {role}: {text[:200]}")
|
||
elif btype == "tool_use":
|
||
name = block.get("name", "") if isinstance(block, dict) else block.name
|
||
print(f"[{i}] {role}: [tool_use: {name}]")
|
||
elif btype == "tool_result":
|
||
cval = block.get("content", "") if isinstance(block, dict) else block.content
|
||
print(f"[{i}] {role}: [tool_result: {str(cval)[:100]}]")
|
||
return True
|
||
|
||
def cmd_context(_args: str, state, config) -> bool:
|
||
import anthropic
|
||
# Rough token estimate: 4 chars ≈ 1 token
|
||
msg_chars = sum(
|
||
len(str(m.get("content", ""))) for m in state.messages
|
||
)
|
||
est_tokens = msg_chars // 4
|
||
info(f"Messages: {len(state.messages)}")
|
||
info(f"Estimated tokens: ~{est_tokens:,}")
|
||
info(f"Model: {config['model']}")
|
||
info(f"Max tokens: {config['max_tokens']:,}")
|
||
return True
|
||
|
||
def cmd_cost(_args: str, state, config) -> bool:
|
||
from config import calc_cost
|
||
cost = calc_cost(config["model"],
|
||
state.total_input_tokens,
|
||
state.total_output_tokens)
|
||
info(f"Input tokens: {state.total_input_tokens:,}")
|
||
info(f"Output tokens: {state.total_output_tokens:,}")
|
||
info(f"Est. cost: ${cost:.4f} USD")
|
||
return True
|
||
|
||
def cmd_verbose(_args: str, _state, config) -> bool:
|
||
config["verbose"] = not config.get("verbose", False)
|
||
state_str = "ON" if config["verbose"] else "OFF"
|
||
ok(f"Verbose mode: {state_str}")
|
||
return True
|
||
|
||
def cmd_thinking(_args: str, _state, config) -> bool:
|
||
config["thinking"] = not config.get("thinking", False)
|
||
state_str = "ON" if config["thinking"] else "OFF"
|
||
ok(f"Extended thinking: {state_str}")
|
||
return True
|
||
|
||
def cmd_permissions(args: str, _state, config) -> bool:
|
||
from config import save_config
|
||
modes = ["auto", "accept-all", "manual"]
|
||
if not args.strip():
|
||
info(f"Permission mode: {config.get('permission_mode','auto')}")
|
||
info(f"Available modes: {', '.join(modes)}")
|
||
else:
|
||
m = args.strip()
|
||
if m not in modes:
|
||
err(f"Unknown mode: {m}. Choose: {', '.join(modes)}")
|
||
else:
|
||
config["permission_mode"] = m
|
||
save_config(config)
|
||
ok(f"Permission mode set to: {m}")
|
||
return True
|
||
|
||
def cmd_cwd(args: str, _state, _config) -> bool:
|
||
if not args.strip():
|
||
info(f"Working directory: {os.getcwd()}")
|
||
else:
|
||
p = args.strip()
|
||
try:
|
||
os.chdir(p)
|
||
ok(f"Changed directory to: {os.getcwd()}")
|
||
except Exception as e:
|
||
err(str(e))
|
||
return True
|
||
|
||
def cmd_exit(_args: str, _state, _config) -> bool:
|
||
ok("Goodbye!")
|
||
sys.exit(0)
|
||
|
||
def cmd_memory(args: str, _state, _config) -> bool:
|
||
from memory import search_memory, load_index
|
||
from memory.scan import scan_all_memories, format_memory_manifest, memory_freshness_text
|
||
|
||
if args.strip():
|
||
results = search_memory(args.strip())
|
||
if not results:
|
||
info(f"No memories matching '{args.strip()}'")
|
||
return True
|
||
info(f" {len(results)} result(s) for '{args.strip()}':")
|
||
for m in results:
|
||
info(f" [{m.type:9s}|{m.scope:7s}] {m.name}: {m.description}")
|
||
info(f" {m.content[:120]}{'...' if len(m.content) > 120 else ''}")
|
||
return True
|
||
|
||
# Show manifest with age/freshness
|
||
headers = scan_all_memories()
|
||
if not headers:
|
||
info("No memories stored. The model saves memories via MemorySave.")
|
||
return True
|
||
info(f" {len(headers)} memory/memories (newest first):")
|
||
for h in headers:
|
||
fresh_warn = " ⚠ stale" if memory_freshness_text(h.mtime_s) else ""
|
||
tag = f"[{h.type or '?':9s}|{h.scope:7s}]"
|
||
info(f" {tag} {h.filename}{fresh_warn}")
|
||
if h.description:
|
||
info(f" {h.description}")
|
||
return True
|
||
|
||
def cmd_agents(_args: str, _state, _config) -> bool:
|
||
try:
|
||
from multi_agent.tools import get_agent_manager
|
||
mgr = get_agent_manager()
|
||
tasks = mgr.list_tasks()
|
||
if not tasks:
|
||
info("No sub-agent tasks.")
|
||
return True
|
||
info(f" {len(tasks)} sub-agent task(s):")
|
||
for t in tasks:
|
||
preview = t.prompt[:50] + ("..." if len(t.prompt) > 50 else "")
|
||
wt_info = f" branch:{t.worktree_branch}" if t.worktree_branch else ""
|
||
info(f" {t.id} [{t.status:9s}] name={t.name}{wt_info} {preview}")
|
||
except Exception:
|
||
info("Sub-agent system not initialized.")
|
||
return True
|
||
|
||
|
||
def _print_background_notifications():
|
||
"""Print notifications for newly completed background agent tasks.
|
||
|
||
Called before each user prompt so the user sees results without polling.
|
||
"""
|
||
try:
|
||
from multi_agent.tools import get_agent_manager
|
||
mgr = get_agent_manager()
|
||
except Exception:
|
||
return
|
||
|
||
notified_key = "_notified"
|
||
if not hasattr(_print_background_notifications, "_seen"):
|
||
_print_background_notifications._seen = set()
|
||
|
||
for task in mgr.list_tasks():
|
||
if task.id in _print_background_notifications._seen:
|
||
continue
|
||
if task.status in ("completed", "failed", "cancelled"):
|
||
_print_background_notifications._seen.add(task.id)
|
||
icon = "✓" if task.status == "completed" else "✗"
|
||
color = "green" if task.status == "completed" else "red"
|
||
branch_info = f" [branch: {task.worktree_branch}]" if task.worktree_branch else ""
|
||
print(clr(
|
||
f"\n {icon} Background agent '{task.name}' {task.status}{branch_info}",
|
||
color, "bold"
|
||
))
|
||
if task.result:
|
||
preview = task.result[:200] + ("..." if len(task.result) > 200 else "")
|
||
print(clr(f" {preview}", "dim"))
|
||
print()
|
||
|
||
def cmd_skills(_args: str, _state, _config) -> bool:
|
||
from skill import load_skills
|
||
skills = load_skills()
|
||
if not skills:
|
||
info("No skills found.")
|
||
return True
|
||
info(f"Available skills ({len(skills)}):")
|
||
for s in skills:
|
||
triggers = ", ".join(s.triggers)
|
||
source_label = f"[{s.source}]" if s.source != "builtin" else ""
|
||
hint = f" args: {s.argument_hint}" if s.argument_hint else ""
|
||
print(f" {clr(s.name, 'cyan'):24s} {s.description} {clr(triggers, 'dim')}{hint} {clr(source_label, 'yellow')}")
|
||
if s.when_to_use:
|
||
print(f" {clr(s.when_to_use[:80], 'dim')}")
|
||
return True
|
||
|
||
COMMANDS = {
|
||
"help": cmd_help,
|
||
"clear": cmd_clear,
|
||
"model": cmd_model,
|
||
"config": cmd_config,
|
||
"save": cmd_save,
|
||
"load": cmd_load,
|
||
"history": cmd_history,
|
||
"context": cmd_context,
|
||
"cost": cmd_cost,
|
||
"verbose": cmd_verbose,
|
||
"thinking": cmd_thinking,
|
||
"permissions": cmd_permissions,
|
||
"cwd": cmd_cwd,
|
||
"skills": cmd_skills,
|
||
"memory": cmd_memory,
|
||
"agents": cmd_agents,
|
||
"exit": cmd_exit,
|
||
"quit": cmd_exit,
|
||
}
|
||
|
||
|
||
def handle_slash(line: str, state, config) -> Union[bool, tuple]:
|
||
"""Handle /command [args]. Returns True if handled, tuple (skill, args) for skill match."""
|
||
if not line.startswith("/"):
|
||
return False
|
||
parts = line[1:].split(None, 1)
|
||
if not parts:
|
||
return False
|
||
cmd = parts[0].lower()
|
||
args = parts[1] if len(parts) > 1 else ""
|
||
handler = COMMANDS.get(cmd)
|
||
if handler:
|
||
handler(args, state, config)
|
||
return True
|
||
|
||
# Fall through to skill lookup
|
||
from skill import find_skill
|
||
skill = find_skill(line)
|
||
if skill:
|
||
cmd_parts = line.strip().split(maxsplit=1)
|
||
skill_args = cmd_parts[1] if len(cmd_parts) > 1 else ""
|
||
return (skill, skill_args)
|
||
|
||
err(f"Unknown command: /{cmd} (type /help for commands)")
|
||
return True
|
||
|
||
|
||
# ── Input history setup ────────────────────────────────────────────────────
|
||
|
||
def setup_readline(history_file: Path):
|
||
if readline is None:
|
||
return
|
||
try:
|
||
readline.read_history_file(str(history_file))
|
||
except FileNotFoundError:
|
||
pass
|
||
readline.set_history_length(1000)
|
||
atexit.register(readline.write_history_file, str(history_file))
|
||
|
||
# Tab-complete slash commands
|
||
commands = [f"/{c}" for c in COMMANDS]
|
||
def completer(text: str, state: int):
|
||
matches = [c for c in commands if c.startswith(text)]
|
||
return matches[state] if state < len(matches) else None
|
||
readline.set_completer(completer)
|
||
readline.parse_and_bind("tab: complete")
|
||
|
||
|
||
# ── Main REPL ──────────────────────────────────────────────────────────────
|
||
|
||
def repl(config: dict, initial_prompt: str = None):
|
||
from config import HISTORY_FILE
|
||
from context import build_system_prompt
|
||
from agent import AgentState, run, TextChunk, ThinkingChunk, ToolStart, ToolEnd, TurnDone, PermissionRequest
|
||
|
||
setup_readline(HISTORY_FILE)
|
||
state = AgentState()
|
||
verbose = config.get("verbose", False)
|
||
|
||
# Banner
|
||
if not initial_prompt:
|
||
from providers import detect_provider
|
||
model = config["model"]
|
||
pname = detect_provider(model)
|
||
model_clr = clr(model, "cyan", "bold")
|
||
prov_clr = clr(f"({pname})", "dim")
|
||
pmode = clr(config.get("permission_mode", "auto"), "yellow")
|
||
print(clr("╭─ Nano Claude Code ──────────────────────────────╮", "dim"))
|
||
print(clr("│ Model: ", "dim") + model_clr + " " + prov_clr)
|
||
print(clr("│ Permissions: ", "dim") + pmode)
|
||
print(clr("│ /model to switch provider · /help for commands │", "dim"))
|
||
print(clr("╰──────────────────────────────────────────────────╯", "dim"))
|
||
print()
|
||
|
||
def run_query(user_input: str):
|
||
nonlocal verbose
|
||
verbose = config.get("verbose", False)
|
||
|
||
# Rebuild system prompt each turn (picks up cwd changes, etc.)
|
||
system_prompt = build_system_prompt()
|
||
|
||
print(clr("\n╭─ Claude ", "dim") + clr("●", "green") + clr(" ─────────────────────────", "dim"))
|
||
print(clr("│ ", "dim"), end="", flush=True)
|
||
|
||
thinking_started = False
|
||
|
||
for event in run(user_input, state, config, system_prompt):
|
||
if isinstance(event, TextChunk):
|
||
stream_text(event.text)
|
||
|
||
elif isinstance(event, ThinkingChunk):
|
||
if verbose:
|
||
if not thinking_started:
|
||
print(clr("\n [thinking]", "dim"))
|
||
thinking_started = True
|
||
stream_thinking(event.text, verbose)
|
||
|
||
elif isinstance(event, ToolStart):
|
||
flush_response()
|
||
print_tool_start(event.name, event.inputs, verbose)
|
||
|
||
elif isinstance(event, PermissionRequest):
|
||
event.granted = ask_permission_interactive(event.description, config)
|
||
|
||
elif isinstance(event, ToolEnd):
|
||
print_tool_end(event.name, event.result, verbose)
|
||
# Print prefix for next text
|
||
print(clr("│ ", "dim"), end="", flush=True)
|
||
|
||
elif isinstance(event, TurnDone):
|
||
if verbose:
|
||
print(clr(
|
||
f"\n [tokens: +{event.input_tokens} in / "
|
||
f"+{event.output_tokens} out]", "dim"
|
||
))
|
||
|
||
flush_response()
|
||
print(clr("╰──────────────────────────────────────────────", "dim"))
|
||
print()
|
||
|
||
# ── Main loop ──
|
||
if initial_prompt:
|
||
try:
|
||
run_query(initial_prompt)
|
||
except KeyboardInterrupt:
|
||
print()
|
||
return
|
||
|
||
while True:
|
||
# Show notifications for background agents that finished
|
||
_print_background_notifications()
|
||
try:
|
||
cwd_short = Path.cwd().name
|
||
prompt = clr(f"\n[{cwd_short}] ", "dim") + clr("❯ ", "cyan", "bold")
|
||
user_input = input(prompt).strip()
|
||
except (EOFError, KeyboardInterrupt):
|
||
print()
|
||
ok("Goodbye!")
|
||
sys.exit(0)
|
||
|
||
if not user_input:
|
||
continue
|
||
|
||
result = handle_slash(user_input, state, config)
|
||
if isinstance(result, tuple):
|
||
skill, skill_args = result
|
||
info(f"Running skill: {skill.name}" + (f" [{skill.context}]" if skill.context == "fork" else ""))
|
||
try:
|
||
from skill import substitute_arguments
|
||
rendered = substitute_arguments(skill.prompt, skill_args, skill.arguments)
|
||
run_query(f"[Skill: {skill.name}]\n\n{rendered}")
|
||
except KeyboardInterrupt:
|
||
print(clr("\n (interrupted)", "yellow"))
|
||
continue
|
||
if result:
|
||
continue
|
||
|
||
try:
|
||
run_query(user_input)
|
||
except KeyboardInterrupt:
|
||
print(clr("\n (interrupted)", "yellow"))
|
||
# Keep conversation history up to the interruption
|
||
|
||
|
||
# ── Entry point ────────────────────────────────────────────────────────────
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
prog="nano_claude",
|
||
description="Nano Claude Code — minimal Python Claude Code implementation",
|
||
add_help=False,
|
||
)
|
||
parser.add_argument("prompt", nargs="*", help="Initial prompt (non-interactive)")
|
||
parser.add_argument("-p", "--print", "--print-output",
|
||
dest="print_mode", action="store_true",
|
||
help="Non-interactive mode: run prompt and exit")
|
||
parser.add_argument("-m", "--model", help="Override model")
|
||
parser.add_argument("--accept-all", action="store_true",
|
||
help="Never ask permission (accept all operations)")
|
||
parser.add_argument("--verbose", action="store_true",
|
||
help="Show thinking + token counts")
|
||
parser.add_argument("--thinking", action="store_true",
|
||
help="Enable extended thinking")
|
||
parser.add_argument("--version", action="store_true", help="Print version")
|
||
parser.add_argument("-h", "--help", action="store_true", help="Show help")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.version:
|
||
print(f"nano claude code v{VERSION}")
|
||
sys.exit(0)
|
||
|
||
if args.help:
|
||
print(__doc__)
|
||
sys.exit(0)
|
||
|
||
from config import load_config, save_config, has_api_key
|
||
from providers import detect_provider, PROVIDERS
|
||
|
||
config = load_config()
|
||
|
||
# Apply CLI overrides first (so key check uses the right provider)
|
||
if args.model:
|
||
config["model"] = args.model.replace(":", "/", 1)
|
||
if args.accept_all:
|
||
config["permission_mode"] = "accept-all"
|
||
if args.verbose:
|
||
config["verbose"] = True
|
||
if args.thinking:
|
||
config["thinking"] = True
|
||
|
||
# Check API key for active provider (warn only, don't block local providers)
|
||
if not has_api_key(config):
|
||
pname = detect_provider(config["model"])
|
||
prov = PROVIDERS.get(pname, {})
|
||
env = prov.get("api_key_env", "")
|
||
if env: # local providers like ollama have no env key requirement
|
||
warn(f"No API key found for provider '{pname}'. "
|
||
f"Set {env} or run: /config {pname}_api_key=YOUR_KEY")
|
||
|
||
initial = " ".join(args.prompt) if args.prompt else None
|
||
if args.print_mode and not initial:
|
||
err("--print requires a prompt argument")
|
||
sys.exit(1)
|
||
|
||
repl(config, initial_prompt=initial)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|