Files
collection-claude-code-sour…/nano-claude-code/nano_claude.py
chauncygu 1d4ffa964d Update README.MD and add nano-claude-code v3.0 + original-source-code/src
- README.MD: add original-source-code and nano-claude-code sections, update
  overview table (4 subprojects), add v3.0 news entry, expand comparison table
  with memory/multi-agent/skills dimensions
- nano-claude-code v3.0: multi-agent package (multi_agent/), memory package
  (memory/), skill package (skill/) with built-in /commit and /review skills,
  context compression (compaction.py), tool registry plugin system, diff view,
  17 slash commands, 18 built-in tools, 101 tests (~5000 lines total)
- original-source-code/src: add raw TypeScript source tree (1884 files)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 10:26:29 -07:00

749 lines
28 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Nano Claude Code — Minimal Python implementation of Claude Code.
Usage:
python nano_claude.py [options] [prompt]
Options:
-p, --print Non-interactive: run prompt and exit (also --print-output)
-m, --model MODEL Override model
--accept-all Never ask permission (dangerous)
--verbose Show thinking + token counts
--version Print version and exit
Slash commands in REPL:
/help Show this help
/clear Clear conversation
/model [m] Show or set model
/config Show config / set key=value
/save [f] Save session to file
/load [f] Load session from file
/history Print conversation history
/context Show context window usage
/cost Show API cost this session
/verbose Toggle verbose mode
/thinking Toggle extended thinking
/permissions [mode] Set permission mode
/cwd [path] Show or change working directory
/memory [query] Show/search persistent memories
/skills List available skills
/agents Show sub-agent tasks
/exit /quit Exit
"""
from __future__ import annotations
import os
import sys
import json
try:
import readline
except ImportError:
readline = None # Windows compatibility
import atexit
import argparse
import textwrap
from pathlib import Path
from datetime import datetime
from typing import Optional, Union
# ── Optional rich for markdown rendering ──────────────────────────────────
try:
from rich.console import Console
from rich.markdown import Markdown
from rich.syntax import Syntax
from rich.panel import Panel
from rich import print as rprint
_RICH = True
console = Console()
except ImportError:
_RICH = False
console = None
VERSION = "1.0.0"
# ── ANSI helpers (used even with rich for non-markdown output) ─────────────
C = {
"cyan": "\033[36m",
"green": "\033[32m",
"yellow": "\033[33m",
"red": "\033[31m",
"blue": "\033[34m",
"magenta": "\033[35m",
"bold": "\033[1m",
"dim": "\033[2m",
"reset": "\033[0m",
}
def clr(text: str, *keys: str) -> str:
return "".join(C[k] for k in keys) + str(text) + C["reset"]
def info(msg: str): print(clr(msg, "cyan"))
def ok(msg: str): print(clr(msg, "green"))
def warn(msg: str): print(clr(f"Warning: {msg}", "yellow"))
def err(msg: str): print(clr(f"Error: {msg}", "red"), file=sys.stderr)
def render_diff(text: str):
"""Print diff text with ANSI colors: red for removals, green for additions."""
for line in text.splitlines():
if line.startswith("+++") or line.startswith("---"):
print(C["bold"] + line + C["reset"])
elif line.startswith("+"):
print(C["green"] + line + C["reset"])
elif line.startswith("-"):
print(C["red"] + line + C["reset"])
elif line.startswith("@@"):
print(C["cyan"] + line + C["reset"])
else:
print(line)
def _has_diff(text: str) -> bool:
"""Check if text contains a unified diff."""
return "--- a/" in text and "+++ b/" in text
# ── Conversation rendering ─────────────────────────────────────────────────
_accumulated_text: list[str] = [] # buffer text during streaming
def stream_text(chunk: str):
"""Called for each streamed text chunk."""
print(chunk, end="", flush=True)
_accumulated_text.append(chunk)
def stream_thinking(chunk: str, verbose: bool):
if verbose:
print(clr(chunk, "dim"), end="", flush=True)
def flush_response():
"""After streaming, optionally re-render as markdown."""
full = "".join(_accumulated_text)
_accumulated_text.clear()
if _RICH and full.strip():
# Re-print with markdown rendering
print("\r", end="") # go to line start to overwrite last newline
# only re-render if there's actual markdown (contains # * ` _ etc.)
if any(c in full for c in ("#", "*", "`", "_", "[")):
print() # newline after streaming
console.print(Markdown(full))
return
print() # ensure newline after stream
def print_tool_start(name: str, inputs: dict, verbose: bool):
"""Show tool invocation."""
desc = _tool_desc(name, inputs)
print(clr(f"\n{desc}", "dim", "cyan"), flush=True)
if verbose:
print(clr(f" inputs: {json.dumps(inputs, ensure_ascii=False)[:200]}", "dim"))
def print_tool_end(name: str, result: str, verbose: bool):
lines = result.count("\n") + 1
size = len(result)
summary = f"{lines} lines ({size} chars)"
if not result.startswith("Error") and not result.startswith("Denied"):
print(clr(f"{summary}", "dim", "green"), flush=True)
# Render diff for Edit/Write results
if name in ("Edit", "Write") and _has_diff(result):
parts = result.split("\n\n", 1)
if len(parts) == 2:
print(clr(f" {parts[0]}", "dim"))
render_diff(parts[1])
else:
print(clr(f"{result[:120]}", "dim", "red"), flush=True)
if verbose and not result.startswith("Denied"):
preview = result[:500] + ("" if len(result) > 500 else "")
print(clr(f" {preview.replace(chr(10), chr(10)+' ')}", "dim"))
def _tool_desc(name: str, inputs: dict) -> str:
if name == "Read": return f"Read({inputs.get('file_path','')})"
if name == "Write": return f"Write({inputs.get('file_path','')})"
if name == "Edit": return f"Edit({inputs.get('file_path','')})"
if name == "Bash": return f"Bash({inputs.get('command','')[:80]})"
if name == "Glob": return f"Glob({inputs.get('pattern','')})"
if name == "Grep": return f"Grep({inputs.get('pattern','')})"
if name == "WebFetch": return f"WebFetch({inputs.get('url','')[:60]})"
if name == "WebSearch": return f"WebSearch({inputs.get('query','')})"
if name == "Agent":
atype = inputs.get("subagent_type", "")
aname = inputs.get("name", "")
iso = inputs.get("isolation", "")
bg = not inputs.get("wait", True)
parts = []
if atype: parts.append(atype)
if aname: parts.append(f"name={aname}")
if iso: parts.append(f"isolation={iso}")
if bg: parts.append("background")
suffix = f"({', '.join(parts)})" if parts else ""
prompt_short = inputs.get("prompt", "")[:60]
return f"Agent{suffix}: {prompt_short}"
if name == "SendMessage":
return f"SendMessage(to={inputs.get('to','')}: {inputs.get('message','')[:50]})"
if name == "CheckAgentResult": return f"CheckAgentResult({inputs.get('task_id','')})"
if name == "ListAgentTasks": return "ListAgentTasks()"
if name == "ListAgentTypes": return "ListAgentTypes()"
return f"{name}({list(inputs.values())[:1]})"
# ── Permission prompt ──────────────────────────────────────────────────────
def ask_permission_interactive(desc: str, config: dict) -> bool:
try:
print()
ans = input(clr(f" Allow: {desc} [y/N/a(ccept-all)] ", "yellow")).strip().lower()
if ans == "a":
config["permission_mode"] = "accept-all"
ok(" Permission mode set to accept-all for this session.")
return True
return ans in ("y", "yes")
except (KeyboardInterrupt, EOFError):
print()
return False
# ── Slash commands ─────────────────────────────────────────────────────────
def cmd_help(_args: str, _state, _config) -> bool:
print(__doc__)
return True
def cmd_clear(_args: str, state, _config) -> bool:
state.messages.clear()
state.turn_count = 0
ok("Conversation cleared.")
return True
def cmd_model(args: str, _state, config) -> bool:
from providers import PROVIDERS, detect_provider
if not args:
model = config["model"]
pname = detect_provider(model)
info(f"Current model: {model} (provider: {pname})")
info("\nAvailable models by provider:")
for pn, pdata in PROVIDERS.items():
ms = pdata.get("models", [])
if ms:
info(f" {pn:12s} " + ", ".join(ms[:4]) + ("..." if len(ms) > 4 else ""))
info("\nFormat: 'provider/model' or just model name (auto-detected)")
info(" e.g. /model gpt-4o")
info(" e.g. /model ollama/qwen2.5-coder")
info(" e.g. /model kimi:moonshot-v1-32k")
else:
# Accept both "ollama/model" and "ollama:model" syntax
m = args.strip().replace(":", "/", 1)
config["model"] = m
pname = detect_provider(m)
ok(f"Model set to {m} (provider: {pname})")
from config import save_config
save_config(config)
return True
def cmd_config(args: str, _state, config) -> bool:
from config import save_config
if not args:
display = {k: v for k, v in config.items() if k != "api_key"}
print(json.dumps(display, indent=2))
elif "=" in args:
key, _, val = args.partition("=")
key, val = key.strip(), val.strip()
# Type coercion
if val.lower() in ("true", "false"):
val = val.lower() == "true"
elif val.isdigit():
val = int(val)
config[key] = val
save_config(config)
ok(f"Set {key} = {val}")
else:
k = args.strip()
v = config.get(k, "(not set)")
info(f"{k} = {v}")
return True
def cmd_save(args: str, state, _config) -> bool:
from config import SESSIONS_DIR
fname = args.strip() or f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
path = Path(fname) if "/" in fname else SESSIONS_DIR / fname
data = {
"messages": [
m if not isinstance(m.get("content"), list) else
{**m, "content": [
b if isinstance(b, dict) else b.model_dump()
for b in m["content"]
]}
for m in state.messages
],
"turn_count": state.turn_count,
"total_input_tokens": state.total_input_tokens,
"total_output_tokens": state.total_output_tokens,
}
path.write_text(json.dumps(data, indent=2, default=str))
ok(f"Session saved to {path}")
return True
def cmd_load(args: str, state, _config) -> bool:
from config import SESSIONS_DIR
if not args.strip():
# List available sessions
sessions = sorted(SESSIONS_DIR.glob("*.json"))
if not sessions:
info("No saved sessions found.")
else:
info("Saved sessions:")
for s in sessions:
info(f" {s.name}")
return True
fname = args.strip()
path = Path(fname) if "/" in fname else SESSIONS_DIR / fname
if not path.exists():
err(f"File not found: {path}")
return True
data = json.loads(path.read_text())
state.messages = data.get("messages", [])
state.turn_count = data.get("turn_count", 0)
state.total_input_tokens = data.get("total_input_tokens", 0)
state.total_output_tokens = data.get("total_output_tokens", 0)
ok(f"Session loaded from {path} ({len(state.messages)} messages)")
return True
def cmd_history(_args: str, state, _config) -> bool:
if not state.messages:
info("(empty conversation)")
return True
for i, m in enumerate(state.messages):
role = clr(m["role"].upper(), "bold",
"cyan" if m["role"] == "user" else "green")
content = m["content"]
if isinstance(content, str):
print(f"[{i}] {role}: {content[:200]}")
elif isinstance(content, list):
for block in content:
if isinstance(block, dict):
btype = block.get("type", "")
else:
btype = getattr(block, "type", "")
if btype == "text":
text = block.get("text", "") if isinstance(block, dict) else block.text
print(f"[{i}] {role}: {text[:200]}")
elif btype == "tool_use":
name = block.get("name", "") if isinstance(block, dict) else block.name
print(f"[{i}] {role}: [tool_use: {name}]")
elif btype == "tool_result":
cval = block.get("content", "") if isinstance(block, dict) else block.content
print(f"[{i}] {role}: [tool_result: {str(cval)[:100]}]")
return True
def cmd_context(_args: str, state, config) -> bool:
import anthropic
# Rough token estimate: 4 chars ≈ 1 token
msg_chars = sum(
len(str(m.get("content", ""))) for m in state.messages
)
est_tokens = msg_chars // 4
info(f"Messages: {len(state.messages)}")
info(f"Estimated tokens: ~{est_tokens:,}")
info(f"Model: {config['model']}")
info(f"Max tokens: {config['max_tokens']:,}")
return True
def cmd_cost(_args: str, state, config) -> bool:
from config import calc_cost
cost = calc_cost(config["model"],
state.total_input_tokens,
state.total_output_tokens)
info(f"Input tokens: {state.total_input_tokens:,}")
info(f"Output tokens: {state.total_output_tokens:,}")
info(f"Est. cost: ${cost:.4f} USD")
return True
def cmd_verbose(_args: str, _state, config) -> bool:
config["verbose"] = not config.get("verbose", False)
state_str = "ON" if config["verbose"] else "OFF"
ok(f"Verbose mode: {state_str}")
return True
def cmd_thinking(_args: str, _state, config) -> bool:
config["thinking"] = not config.get("thinking", False)
state_str = "ON" if config["thinking"] else "OFF"
ok(f"Extended thinking: {state_str}")
return True
def cmd_permissions(args: str, _state, config) -> bool:
from config import save_config
modes = ["auto", "accept-all", "manual"]
if not args.strip():
info(f"Permission mode: {config.get('permission_mode','auto')}")
info(f"Available modes: {', '.join(modes)}")
else:
m = args.strip()
if m not in modes:
err(f"Unknown mode: {m}. Choose: {', '.join(modes)}")
else:
config["permission_mode"] = m
save_config(config)
ok(f"Permission mode set to: {m}")
return True
def cmd_cwd(args: str, _state, _config) -> bool:
if not args.strip():
info(f"Working directory: {os.getcwd()}")
else:
p = args.strip()
try:
os.chdir(p)
ok(f"Changed directory to: {os.getcwd()}")
except Exception as e:
err(str(e))
return True
def cmd_exit(_args: str, _state, _config) -> bool:
ok("Goodbye!")
sys.exit(0)
def cmd_memory(args: str, _state, _config) -> bool:
from memory import search_memory, load_index
from memory.scan import scan_all_memories, format_memory_manifest, memory_freshness_text
if args.strip():
results = search_memory(args.strip())
if not results:
info(f"No memories matching '{args.strip()}'")
return True
info(f" {len(results)} result(s) for '{args.strip()}':")
for m in results:
info(f" [{m.type:9s}|{m.scope:7s}] {m.name}: {m.description}")
info(f" {m.content[:120]}{'...' if len(m.content) > 120 else ''}")
return True
# Show manifest with age/freshness
headers = scan_all_memories()
if not headers:
info("No memories stored. The model saves memories via MemorySave.")
return True
info(f" {len(headers)} memory/memories (newest first):")
for h in headers:
fresh_warn = " ⚠ stale" if memory_freshness_text(h.mtime_s) else ""
tag = f"[{h.type or '?':9s}|{h.scope:7s}]"
info(f" {tag} {h.filename}{fresh_warn}")
if h.description:
info(f" {h.description}")
return True
def cmd_agents(_args: str, _state, _config) -> bool:
try:
from multi_agent.tools import get_agent_manager
mgr = get_agent_manager()
tasks = mgr.list_tasks()
if not tasks:
info("No sub-agent tasks.")
return True
info(f" {len(tasks)} sub-agent task(s):")
for t in tasks:
preview = t.prompt[:50] + ("..." if len(t.prompt) > 50 else "")
wt_info = f" branch:{t.worktree_branch}" if t.worktree_branch else ""
info(f" {t.id} [{t.status:9s}] name={t.name}{wt_info} {preview}")
except Exception:
info("Sub-agent system not initialized.")
return True
def _print_background_notifications():
"""Print notifications for newly completed background agent tasks.
Called before each user prompt so the user sees results without polling.
"""
try:
from multi_agent.tools import get_agent_manager
mgr = get_agent_manager()
except Exception:
return
notified_key = "_notified"
if not hasattr(_print_background_notifications, "_seen"):
_print_background_notifications._seen = set()
for task in mgr.list_tasks():
if task.id in _print_background_notifications._seen:
continue
if task.status in ("completed", "failed", "cancelled"):
_print_background_notifications._seen.add(task.id)
icon = "" if task.status == "completed" else ""
color = "green" if task.status == "completed" else "red"
branch_info = f" [branch: {task.worktree_branch}]" if task.worktree_branch else ""
print(clr(
f"\n {icon} Background agent '{task.name}' {task.status}{branch_info}",
color, "bold"
))
if task.result:
preview = task.result[:200] + ("..." if len(task.result) > 200 else "")
print(clr(f" {preview}", "dim"))
print()
def cmd_skills(_args: str, _state, _config) -> bool:
from skill import load_skills
skills = load_skills()
if not skills:
info("No skills found.")
return True
info(f"Available skills ({len(skills)}):")
for s in skills:
triggers = ", ".join(s.triggers)
source_label = f"[{s.source}]" if s.source != "builtin" else ""
hint = f" args: {s.argument_hint}" if s.argument_hint else ""
print(f" {clr(s.name, 'cyan'):24s} {s.description} {clr(triggers, 'dim')}{hint} {clr(source_label, 'yellow')}")
if s.when_to_use:
print(f" {clr(s.when_to_use[:80], 'dim')}")
return True
COMMANDS = {
"help": cmd_help,
"clear": cmd_clear,
"model": cmd_model,
"config": cmd_config,
"save": cmd_save,
"load": cmd_load,
"history": cmd_history,
"context": cmd_context,
"cost": cmd_cost,
"verbose": cmd_verbose,
"thinking": cmd_thinking,
"permissions": cmd_permissions,
"cwd": cmd_cwd,
"skills": cmd_skills,
"memory": cmd_memory,
"agents": cmd_agents,
"exit": cmd_exit,
"quit": cmd_exit,
}
def handle_slash(line: str, state, config) -> Union[bool, tuple]:
"""Handle /command [args]. Returns True if handled, tuple (skill, args) for skill match."""
if not line.startswith("/"):
return False
parts = line[1:].split(None, 1)
if not parts:
return False
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
handler = COMMANDS.get(cmd)
if handler:
handler(args, state, config)
return True
# Fall through to skill lookup
from skill import find_skill
skill = find_skill(line)
if skill:
cmd_parts = line.strip().split(maxsplit=1)
skill_args = cmd_parts[1] if len(cmd_parts) > 1 else ""
return (skill, skill_args)
err(f"Unknown command: /{cmd} (type /help for commands)")
return True
# ── Input history setup ────────────────────────────────────────────────────
def setup_readline(history_file: Path):
if readline is None:
return
try:
readline.read_history_file(str(history_file))
except FileNotFoundError:
pass
readline.set_history_length(1000)
atexit.register(readline.write_history_file, str(history_file))
# Tab-complete slash commands
commands = [f"/{c}" for c in COMMANDS]
def completer(text: str, state: int):
matches = [c for c in commands if c.startswith(text)]
return matches[state] if state < len(matches) else None
readline.set_completer(completer)
readline.parse_and_bind("tab: complete")
# ── Main REPL ──────────────────────────────────────────────────────────────
def repl(config: dict, initial_prompt: str = None):
from config import HISTORY_FILE
from context import build_system_prompt
from agent import AgentState, run, TextChunk, ThinkingChunk, ToolStart, ToolEnd, TurnDone, PermissionRequest
setup_readline(HISTORY_FILE)
state = AgentState()
verbose = config.get("verbose", False)
# Banner
if not initial_prompt:
from providers import detect_provider
model = config["model"]
pname = detect_provider(model)
model_clr = clr(model, "cyan", "bold")
prov_clr = clr(f"({pname})", "dim")
pmode = clr(config.get("permission_mode", "auto"), "yellow")
print(clr("╭─ Nano Claude Code ──────────────────────────────╮", "dim"))
print(clr("│ Model: ", "dim") + model_clr + " " + prov_clr)
print(clr("│ Permissions: ", "dim") + pmode)
print(clr("│ /model to switch provider · /help for commands │", "dim"))
print(clr("╰──────────────────────────────────────────────────╯", "dim"))
print()
def run_query(user_input: str):
nonlocal verbose
verbose = config.get("verbose", False)
# Rebuild system prompt each turn (picks up cwd changes, etc.)
system_prompt = build_system_prompt()
print(clr("\n╭─ Claude ", "dim") + clr("", "green") + clr(" ─────────────────────────", "dim"))
print(clr("", "dim"), end="", flush=True)
thinking_started = False
for event in run(user_input, state, config, system_prompt):
if isinstance(event, TextChunk):
stream_text(event.text)
elif isinstance(event, ThinkingChunk):
if verbose:
if not thinking_started:
print(clr("\n [thinking]", "dim"))
thinking_started = True
stream_thinking(event.text, verbose)
elif isinstance(event, ToolStart):
flush_response()
print_tool_start(event.name, event.inputs, verbose)
elif isinstance(event, PermissionRequest):
event.granted = ask_permission_interactive(event.description, config)
elif isinstance(event, ToolEnd):
print_tool_end(event.name, event.result, verbose)
# Print prefix for next text
print(clr("", "dim"), end="", flush=True)
elif isinstance(event, TurnDone):
if verbose:
print(clr(
f"\n [tokens: +{event.input_tokens} in / "
f"+{event.output_tokens} out]", "dim"
))
flush_response()
print(clr("╰──────────────────────────────────────────────", "dim"))
print()
# ── Main loop ──
if initial_prompt:
try:
run_query(initial_prompt)
except KeyboardInterrupt:
print()
return
while True:
# Show notifications for background agents that finished
_print_background_notifications()
try:
cwd_short = Path.cwd().name
prompt = clr(f"\n[{cwd_short}] ", "dim") + clr(" ", "cyan", "bold")
user_input = input(prompt).strip()
except (EOFError, KeyboardInterrupt):
print()
ok("Goodbye!")
sys.exit(0)
if not user_input:
continue
result = handle_slash(user_input, state, config)
if isinstance(result, tuple):
skill, skill_args = result
info(f"Running skill: {skill.name}" + (f" [{skill.context}]" if skill.context == "fork" else ""))
try:
from skill import substitute_arguments
rendered = substitute_arguments(skill.prompt, skill_args, skill.arguments)
run_query(f"[Skill: {skill.name}]\n\n{rendered}")
except KeyboardInterrupt:
print(clr("\n (interrupted)", "yellow"))
continue
if result:
continue
try:
run_query(user_input)
except KeyboardInterrupt:
print(clr("\n (interrupted)", "yellow"))
# Keep conversation history up to the interruption
# ── Entry point ────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
prog="nano_claude",
description="Nano Claude Code — minimal Python Claude Code implementation",
add_help=False,
)
parser.add_argument("prompt", nargs="*", help="Initial prompt (non-interactive)")
parser.add_argument("-p", "--print", "--print-output",
dest="print_mode", action="store_true",
help="Non-interactive mode: run prompt and exit")
parser.add_argument("-m", "--model", help="Override model")
parser.add_argument("--accept-all", action="store_true",
help="Never ask permission (accept all operations)")
parser.add_argument("--verbose", action="store_true",
help="Show thinking + token counts")
parser.add_argument("--thinking", action="store_true",
help="Enable extended thinking")
parser.add_argument("--version", action="store_true", help="Print version")
parser.add_argument("-h", "--help", action="store_true", help="Show help")
args = parser.parse_args()
if args.version:
print(f"nano claude code v{VERSION}")
sys.exit(0)
if args.help:
print(__doc__)
sys.exit(0)
from config import load_config, save_config, has_api_key
from providers import detect_provider, PROVIDERS
config = load_config()
# Apply CLI overrides first (so key check uses the right provider)
if args.model:
config["model"] = args.model.replace(":", "/", 1)
if args.accept_all:
config["permission_mode"] = "accept-all"
if args.verbose:
config["verbose"] = True
if args.thinking:
config["thinking"] = True
# Check API key for active provider (warn only, don't block local providers)
if not has_api_key(config):
pname = detect_provider(config["model"])
prov = PROVIDERS.get(pname, {})
env = prov.get("api_key_env", "")
if env: # local providers like ollama have no env key requirement
warn(f"No API key found for provider '{pname}'. "
f"Set {env} or run: /config {pname}_api_key=YOUR_KEY")
initial = " ".join(args.prompt) if args.prompt else None
if args.print_mode and not initial:
err("--print requires a prompt argument")
sys.exit(1)
repl(config, initial_prompt=initial)
if __name__ == "__main__":
main()