diff --git a/nano-claude-code/config.py b/nano-claude-code/config.py new file mode 100644 index 0000000..9008729 --- /dev/null +++ b/nano-claude-code/config.py @@ -0,0 +1,69 @@ +"""Configuration management for nano claude (multi-provider).""" +import os +import json +from pathlib import Path + +CONFIG_DIR = Path.home() / ".nano_claude" +CONFIG_FILE = CONFIG_DIR / "config.json" +HISTORY_FILE = CONFIG_DIR / "input_history.txt" +SESSIONS_DIR = CONFIG_DIR / "sessions" + +DEFAULTS = { + "model": "claude-opus-4-6", + "max_tokens": 8192, + "permission_mode": "auto", # auto | accept-all | manual + "verbose": False, + "thinking": False, + "thinking_budget": 10000, + "custom_base_url": "", # for "custom" provider + # Per-provider API keys (optional; env vars take priority) + # "anthropic_api_key": "sk-ant-..." + # "openai_api_key": "sk-..." + # "gemini_api_key": "..." + # "kimi_api_key": "..." + # "qwen_api_key": "..." + # "zhipu_api_key": "..." + # "deepseek_api_key": "..." +} + + +def load_config() -> dict: + CONFIG_DIR.mkdir(exist_ok=True) + SESSIONS_DIR.mkdir(exist_ok=True) + cfg = dict(DEFAULTS) + if CONFIG_FILE.exists(): + try: + cfg.update(json.loads(CONFIG_FILE.read_text())) + except Exception: + pass + # Backward-compat: legacy single api_key → anthropic_api_key + if cfg.get("api_key") and not cfg.get("anthropic_api_key"): + cfg["anthropic_api_key"] = cfg.pop("api_key") + # Also accept ANTHROPIC_API_KEY env for backward-compat + if not cfg.get("anthropic_api_key"): + cfg["anthropic_api_key"] = os.environ.get("ANTHROPIC_API_KEY", "") + return cfg + + +def save_config(cfg: dict): + CONFIG_DIR.mkdir(exist_ok=True) + data = dict(cfg) + CONFIG_FILE.write_text(json.dumps(data, indent=2)) + + +def current_provider(cfg: dict) -> str: + from providers import detect_provider + return detect_provider(cfg.get("model", "claude-opus-4-6")) + + +def has_api_key(cfg: dict) -> bool: + """Check whether the active provider has an API key configured.""" + from providers import get_api_key + pname = current_provider(cfg) + key = get_api_key(pname, cfg) + return bool(key) + + +def calc_cost(model: str, in_tokens: int, out_tokens: int) -> float: + from providers import calc_cost as _cc + return _cc(model, in_tokens, out_tokens) diff --git a/nano-claude-code/context.py b/nano-claude-code/context.py new file mode 100644 index 0000000..6b12e20 --- /dev/null +++ b/nano-claude-code/context.py @@ -0,0 +1,100 @@ +"""System context: CLAUDE.md, git info, cwd injection.""" +import os +import subprocess +from pathlib import Path +from datetime import datetime + +SYSTEM_PROMPT_TEMPLATE = """\ +You are Nano Claude Code, Created by SAIL Lab (Safe AI and Robot Learning Lab), an AI coding assistant running in the terminal. +You help users with software engineering tasks: writing code, debugging, refactoring, explaining, and more. + +# Available Tools +- **Read**: Read file contents with line numbers +- **Write**: Create or overwrite files +- **Edit**: Replace text in a file (exact string replacement) +- **Bash**: Execute shell commands +- **Glob**: Find files by pattern (e.g. **/*.py) +- **Grep**: Search file contents with regex +- **WebFetch**: Fetch and extract content from a URL +- **WebSearch**: Search the web via DuckDuckGo + +# Guidelines +- Be concise and direct. Lead with the answer. +- Prefer editing existing files over creating new ones. +- Do not add unnecessary comments, docstrings, or error handling. +- When reading files before editing, use line numbers to be precise. +- Always use absolute paths for file operations. +- For multi-step tasks, work through them systematically. +- If a task is unclear, ask for clarification before proceeding. + +# Environment +- Current date: {date} +- Working directory: {cwd} +- Platform: {platform} +{git_info}{claude_md}""" + + +def get_git_info() -> str: + """Return git branch/status summary if in a git repo.""" + try: + branch = subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, text=True).strip() + status = subprocess.check_output( + ["git", "status", "--short"], + stderr=subprocess.DEVNULL, text=True).strip() + log = subprocess.check_output( + ["git", "log", "--oneline", "-5"], + stderr=subprocess.DEVNULL, text=True).strip() + parts = [f"- Git branch: {branch}"] + if status: + lines = status.split('\n')[:10] + parts.append("- Git status:\n" + "\n".join(f" {l}" for l in lines)) + if log: + parts.append("- Recent commits:\n" + "\n".join(f" {l}" for l in log.split('\n'))) + return "\n".join(parts) + "\n" + except Exception: + return "" + + +def get_claude_md() -> str: + """Load CLAUDE.md from cwd or parents, and ~/.claude/CLAUDE.md.""" + content_parts = [] + + # Global CLAUDE.md + global_md = Path.home() / ".claude" / "CLAUDE.md" + if global_md.exists(): + try: + content_parts.append(f"[Global CLAUDE.md]\n{global_md.read_text()}") + except Exception: + pass + + # Project CLAUDE.md (walk up from cwd) + p = Path.cwd() + for _ in range(10): + candidate = p / "CLAUDE.md" + if candidate.exists(): + try: + content_parts.append(f"[Project CLAUDE.md: {candidate}]\n{candidate.read_text()}") + except Exception: + pass + break + parent = p.parent + if parent == p: + break + p = parent + + if not content_parts: + return "" + return "\n# Memory / CLAUDE.md\n" + "\n\n".join(content_parts) + "\n" + + +def build_system_prompt() -> str: + import platform + return SYSTEM_PROMPT_TEMPLATE.format( + date=datetime.now().strftime("%Y-%m-%d %A"), + cwd=str(Path.cwd()), + platform=platform.system(), + git_info=get_git_info(), + claude_md=get_claude_md(), + ) diff --git a/nano-claude-code/demo.py b/nano-claude-code/demo.py new file mode 100644 index 0000000..28b0ca4 --- /dev/null +++ b/nano-claude-code/demo.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +""" +Demo script for nano claude code. +Requires ANTHROPIC_API_KEY environment variable. + +Run: + ANTHROPIC_API_KEY=sk-... python demo.py +""" +import os +import sys + +# Add parent path for imports +sys.path.insert(0, os.path.dirname(__file__)) + +from config import load_config +from context import build_system_prompt +from agent import AgentState, run, TextChunk, ThinkingChunk, ToolStart, ToolEnd, TurnDone, PermissionRequest + +def demo(): + config = load_config() + if not config["api_key"]: + print("Error: Set ANTHROPIC_API_KEY environment variable") + sys.exit(1) + + config["permission_mode"] = "accept-all" # Demo: auto-approve everything + config["verbose"] = True + state = AgentState() + system_prompt = build_system_prompt() + + print("=" * 60) + print("DEMO 1: Simple question (no tools)") + print("=" * 60) + _run_demo(state, config, system_prompt, + "What is the time complexity of quicksort? Answer in 2 sentences.") + + print("\n" + "=" * 60) + print("DEMO 2: File system exploration (uses Glob + Read tools)") + print("=" * 60) + state2 = AgentState() + _run_demo(state2, config, system_prompt, + "List all Python files in the current directory and show me the first 5 lines of nano_claude.py") + + print("\n" + "=" * 60) + print("DEMO 3: Code writing (uses Write + Bash tools)") + print("=" * 60) + state3 = AgentState() + _run_demo(state3, config, system_prompt, + "Write a Python function to fibonacci(n) in /tmp/fib.py, then run it to test fib(10)") + + print("\n" + "=" * 60) + print("DEMO 4: Multi-turn conversation") + print("=" * 60) + state4 = AgentState() + _run_demo(state4, config, system_prompt, + "What are the tools available to you?") + _run_demo(state4, config, system_prompt, + "Which of those tools would you use to find all TODO comments in a codebase?") + + print("\n" + "=" * 60) + print("DEMO 5: Web search") + print("=" * 60) + state5 = AgentState() + _run_demo(state5, config, system_prompt, + "Search the web for 'Python 3.13 new features' and give me a brief summary") + + +def _run_demo(state: AgentState, config: dict, system_prompt: str, prompt: str): + print(f"\n[USER]: {prompt}\n") + print("[CLAUDE]: ", end="", flush=True) + + for event in run(prompt, state, config, system_prompt): + if isinstance(event, TextChunk): + print(event.text, end="", flush=True) + elif isinstance(event, ThinkingChunk): + if config.get("verbose"): + print(f"\033[2m[thinking: {event.text[:100]}]\033[0m", end="", flush=True) + elif isinstance(event, ToolStart): + print(f"\n\033[36m ⚙ {event.name}({list(event.inputs.values())[0] if event.inputs else ''})\033[0m", flush=True) + elif isinstance(event, PermissionRequest): + event.granted = True # Auto-approve in demo + elif isinstance(event, ToolEnd): + result_preview = event.result[:100].replace('\n', '↵') + print(f"\033[32m ✓ → {result_preview}\033[0m", flush=True) + elif isinstance(event, TurnDone): + print(f"\n\033[2m [+{event.input_tokens} in / +{event.output_tokens} out]\033[0m", flush=True) + + print() + + +if __name__ == "__main__": + demo() diff --git a/nano-claude-code/make_demo.py b/nano-claude-code/make_demo.py new file mode 100644 index 0000000..d92eb96 --- /dev/null +++ b/nano-claude-code/make_demo.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 +""" +Generate animated GIF demo of nano claude code using PIL. +Simulates a realistic terminal session with tool calls. +""" +from PIL import Image, ImageDraw, ImageFont +import os, textwrap + +# ── Catppuccin Mocha palette ───────────────────────────────────────────── +BG = (30, 30, 46) # base +SURFACE = (49, 50, 68) # surface0 +TEXT = (205, 214, 244) # text +SUBTEXT = (108, 112, 134) # overlay0 (dim) +CYAN = (137, 220, 235) # sky +GREEN = (166, 227, 161) # green +YELLOW = (249, 226, 175) # yellow +RED = (243, 139, 168) # red +MAUVE = (203, 166, 247) # mauve (user prompt) +BLUE = (137, 180, 250) # blue +PEACH = (250, 179, 135) # peach + +W, H = 960, 720 +FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf" +FONT_BOLD = "/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf" +FONT_SIZE = 14 +LINE_H = 20 +PAD_X = 18 +PAD_Y = 16 + + +def make_font(size=FONT_SIZE, bold=False): + path = FONT_BOLD if bold else FONT_PATH + try: + return ImageFont.truetype(path, size) + except: + return ImageFont.load_default() + + +FONT = make_font() +FONT_B = make_font(bold=True) +FONT_SM = make_font(FONT_SIZE - 1) + + +# ── Segment: (text, color, bold?) ──────────────────────────────────────── +Seg = tuple # (str, rgb_tuple, bool) + + +def seg(t, c=TEXT, b=False): return (t, c, b) +def segs(*args): return list(args) + + +def render_line(draw, y, segments, x_start=PAD_X): + x = x_start + for text, color, bold in segments: + font = FONT_B if bold else FONT + draw.text((x, y), text, font=font, fill=color) + x += font.getlength(text) + return y + LINE_H + + +def blank_frame(): + img = Image.new("RGB", (W, H), BG) + return img + + +def draw_frame(lines_segments): + """ + lines_segments: list of either + - list[Seg] → rendered as a line + - None → blank line + Returns PIL Image. + """ + img = blank_frame() + d = ImageDraw.Draw(img) + y = PAD_Y + for item in lines_segments: + if item is None: + y += LINE_H + elif isinstance(item, list): + y = render_line(d, y, item) + else: + y = render_line(d, y, [item]) + return img + + +# ── Pre-defined screen content blocks ─────────────────────────────────── + +BANNER = [ + [seg("╭─ Nano Claude Code ──────────────────────────────────────────╮", SUBTEXT)], + [seg("│ ", SUBTEXT), seg("Model: ", SUBTEXT), seg("claude-opus-4-6", CYAN, True)], + [seg("│ ", SUBTEXT), seg("Permissions: ", SUBTEXT), seg("auto", YELLOW)], + [seg("│ Type /help for commands, Ctrl+C to cancel │", SUBTEXT)], + [seg("╰────────────────────────────────────────────────────────────╯", SUBTEXT)], + None, +] + +def prompt_line(text="", cursor=False): + cur = "█" if cursor else "" + return [ + seg("[nano_claude_code] ", SUBTEXT), + seg("❯ ", CYAN, True), + seg(text + cur, TEXT), + ] + +def claude_header(): + return [ + seg("╭─ Claude ", SUBTEXT), + seg("●", GREEN), + seg(" ─────────────────────────────────────────────", SUBTEXT), + ] + +def claude_sep(): + return [seg("╰──────────────────────────────────────────────────────────", SUBTEXT)] + +def tool_line(icon, name, arg, color=CYAN): + return [ + seg(f" {icon} ", SUBTEXT), + seg(name, color), + seg("(", SUBTEXT), + seg(arg, TEXT), + seg(")", SUBTEXT), + ] + +def tool_ok(msg): + return [seg(f" ✓ ", GREEN), seg(msg, SUBTEXT)] + +def tool_err(msg): + return [seg(f" ✗ ", RED), seg(msg, SUBTEXT)] + +def text_line(t, indent=2): + return [seg(" " * indent + t, TEXT)] + +def dim_line(t, indent=4): + return [seg(" " * indent + t, SUBTEXT)] + + +# ── Scene builder ───────────────────────────────────────────────────────── + +def build_scenes(): + """Return list of (frame_content, duration_ms).""" + scenes = [] + def add(lines, ms=120): + scenes.append((lines, ms)) + + # ── Scene 0: Empty terminal with banner ────────────────────────────── + add(BANNER + [prompt_line(cursor=True)], 800) + + # ── Scene 1: User types query 1 ────────────────────────────────────── + msg1 = "List Python files in this project and show me their line counts" + for i in range(0, len(msg1) + 1, 3): + add(BANNER + [prompt_line(msg1[:i], cursor=(i < len(msg1)))], 60) + add(BANNER + [prompt_line(msg1, cursor=False)], 400) + + # ── Scene 2: Claude header appears ────────────────────────────────── + pre = BANNER + [prompt_line(msg1)] + add(pre + [None, claude_header(), [seg("│ ", SUBTEXT)]], 300) + + # ── Scene 3: Tool call - Glob ──────────────────────────────────────── + base = pre + [None, claude_header()] + add(base + [ + tool_line("⚙", "Glob", "**/*.py"), + ], 500) + add(base + [ + tool_line("⚙", "Glob", "**/*.py"), + tool_ok("5 files matched"), + ], 600) + + # ── Scene 4: Tool call - Bash (wc -l) ──────────────────────────────── + add(base + [ + tool_line("⚙", "Glob", "**/*.py"), + tool_ok("5 files matched"), + None, + tool_line("⚙", "Bash", "wc -l *.py | sort -n"), + ], 500) + add(base + [ + tool_line("⚙", "Glob", "**/*.py"), + tool_ok("5 files matched"), + None, + tool_line("⚙", "Bash", "wc -l *.py | sort -n"), + tool_ok("→ 6 lines (120 chars)"), + ], 700) + + # ── Scene 5: Claude streams response ──────────────────────────────── + response_lines = [ + "Here are the Python files in this project with their line counts:", + "", + " 76 config.py — Configuration management and cost calculation", + " 100 context.py — System prompt builder, CLAUDE.md + git injection", + " 173 agent.py — Core agent loop with streaming API calls", + " 359 tools.py — 8 built-in tools (Read/Write/Edit/Bash/Glob/Grep/Web)", + " 553 nano_claude.py — REPL entry point, slash commands, rich rendering", + "────────────────────────────────────────────────────", + "1261 total", + "", + "The largest file is `nano_claude.py` containing the interactive REPL,", + "14 slash commands, permission handling, and markdown rendering.", + ] + tool_section = [ + tool_line("⚙", "Glob", "**/*.py"), + tool_ok("5 files matched"), + None, + tool_line("⚙", "Bash", "wc -l *.py | sort -n"), + tool_ok("→ 6 lines (120 chars)"), + None, + [seg("│ ", SUBTEXT)], + ] + streamed = [] + for i, rline in enumerate(response_lines): + streamed.append(text_line(rline, 2)) + content = base + tool_section + streamed + add(content, 80 if rline else 30) + + add(base + tool_section + [text_line(l, 2) for l in response_lines] + [claude_sep()], 1200) + + # ── Scene 6: New prompt appears ────────────────────────────────────── + full1 = (pre + [None, claude_header()] + + tool_section + + [text_line(l, 2) for l in response_lines] + + [claude_sep(), None]) + add(full1 + [prompt_line(cursor=True)], 800) + + # ── Scene 7: User types query 2 ────────────────────────────────────── + msg2 = "Write a hello_world.py that prints 'Hello from Nano Claude!'" + for i in range(0, len(msg2) + 1, 4): + add(full1 + [prompt_line(msg2[:i], cursor=(i < len(msg2)))], 55) + add(full1 + [prompt_line(msg2)], 400) + + # ── Scene 8: Write tool call ───────────────────────────────────────── + base2 = full1 + [prompt_line(msg2), None, claude_header()] + add(base2 + [ + tool_line("⚙", "Write", "/tmp/hello_world.py", MAUVE), + ], 600) + add(base2 + [ + tool_line("⚙", "Write", "/tmp/hello_world.py", MAUVE), + tool_ok("Wrote 3 lines to /tmp/hello_world.py"), + None, + tool_line("⚙", "Bash", "python3 /tmp/hello_world.py"), + ], 500) + add(base2 + [ + tool_line("⚙", "Write", "/tmp/hello_world.py", MAUVE), + tool_ok("Wrote 3 lines to /tmp/hello_world.py"), + None, + tool_line("⚙", "Bash", "python3 /tmp/hello_world.py"), + tool_ok("→ Hello from Nano Claude!"), + ], 800) + + # ── Scene 9: Final response ────────────────────────────────────────── + resp2 = [ + "Done! Created `/tmp/hello_world.py` and ran it successfully.", + "", + " print('Hello from Nano Claude!')", + "", + "Output: Hello from Nano Claude!", + ] + tool2 = [ + tool_line("⚙", "Write", "/tmp/hello_world.py", MAUVE), + tool_ok("Wrote 3 lines to /tmp/hello_world.py"), + None, + tool_line("⚙", "Bash", "python3 /tmp/hello_world.py"), + tool_ok("→ Hello from Nano Claude!"), + None, + [seg("│ ", SUBTEXT)], + ] + streamed2 = [] + for rline in resp2: + streamed2.append(text_line(rline, 2)) + add(base2 + tool2 + streamed2, 90) + + add(base2 + tool2 + [text_line(l, 2) for l in resp2] + [claude_sep()], 1500) + + # ── Scene 10: Slash command demo ───────────────────────────────────── + final_state = (full1 + [prompt_line(msg2), None, claude_header()] + + tool2 + [text_line(l, 2) for l in resp2] + [claude_sep(), None]) + add(final_state + [prompt_line(cursor=True)], 600) + + slash = "/cost" + for i in range(len(slash) + 1): + add(final_state + [prompt_line(slash[:i], cursor=(i < len(slash)))], 80) + add(final_state + [prompt_line(slash)], 400) + + # cost output + cost_lines = [ + [seg("Input tokens: ", CYAN), seg("1,842", TEXT, True)], + [seg("Output tokens: ", CYAN), seg("312", TEXT, True)], + [seg("Est. cost: ", CYAN), seg("$0.0318 USD", GREEN, True)], + ] + add(final_state + [prompt_line(slash), None] + cost_lines + [None, prompt_line(cursor=True)], 2000) + + return scenes + + +# ── Render ──────────────────────────────────────────────────────────────── + +def _build_explicit_palette(): + """ + Build a 256-entry palette from our exact theme colors. + Returns flat list of 768 ints (R,G,B, R,G,B, ...) suitable for putpalette(). + """ + # All distinct colors used in the renderer + theme = [ + BG, SURFACE, TEXT, SUBTEXT, + CYAN, GREEN, YELLOW, RED, MAUVE, BLUE, PEACH, + (255, 255, 255), (0, 0, 0), + # Extra intermediate shades that PIL might snap to + (50, 55, 80), # surface variant + (90, 95, 120), # dim text variant + (160, 166, 200), + ] + flat = [] + for c in theme: + flat.extend(c) + # Pad to 256 entries with black + while len(flat) < 256 * 3: + flat.extend((0, 0, 0)) + return flat + + +def render_gif(output_path="demo.gif"): + print("Building scenes...") + scenes = build_scenes() + print(f" {len(scenes)} scenes") + + palette_data = _build_explicit_palette() + + # Create a palette-mode reference image for quantize() + pal_ref = Image.new("P", (1, 1)) + pal_ref.putpalette(palette_data) + + print(" Rendering frames...") + rgb_frames = [] + durations = [] + for i, (lines, ms) in enumerate(scenes): + img = draw_frame(lines) + rgb_frames.append(img) + durations.append(ms) + if i % 20 == 0: + print(f" {i}/{len(scenes)}...") + + # Quantize all frames to the same explicit palette (no dither → exact snap) + print(" Quantizing to global palette...") + p_frames = [f.quantize(palette=pal_ref, dither=0) for f in rgb_frames] + + print(f"Saving GIF → {output_path} ({len(p_frames)} frames)...") + p_frames[0].save( + output_path, + save_all=True, + append_images=p_frames[1:], + duration=durations, + loop=0, + optimize=False, + ) + size_kb = os.path.getsize(output_path) // 1024 + print(f"Done! {size_kb} KB") + + +# ── Static screenshot ───────────────────────────────────────────────────── + +def render_screenshot(output_path="screenshot.png"): + """Single high-quality screenshot showing a complete session.""" + lines = ( + BANNER + + [prompt_line("List Python files and their line counts")] + + [None, claude_header()] + + [ + tool_line("⚙", "Glob", "**/*.py"), + tool_ok("5 files matched"), + None, + tool_line("⚙", "Bash", "wc -l *.py | sort -n"), + tool_ok("→ 6 lines (120 chars)"), + None, + [seg("│ ", SUBTEXT)], + text_line("Here are the Python files with their line counts:", 2), + None, + text_line(" 76 config.py — Configuration management", 2), + text_line(" 100 context.py — System prompt + git injection", 2), + text_line(" 173 agent.py — Core agent loop", 2), + text_line(" 359 tools.py — 8 built-in tools", 2), + text_line(" 553 nano_claude.py — REPL + slash commands", 2), + text_line("────────────────────────────────", 2), + text_line("1261 total", 2), + None, + text_line("The main entry point `nano_claude.py` contains the REPL,", 2), + text_line("14 slash commands, permission handling, and rich rendering.", 2), + claude_sep(), + None, + prompt_line("/cost"), + None, + [seg("Input tokens: ", CYAN), seg("1,842", TEXT, True)], + [seg("Output tokens: ", CYAN), seg("312", TEXT, True)], + [seg("Est. cost: ", CYAN), seg("$0.0318 USD", GREEN, True)], + None, + prompt_line(cursor=True), + ] + ) + img = draw_frame(lines) + + # Add subtle rounded border effect + d = ImageDraw.Draw(img) + d.rectangle([0, 0, W-1, H-1], outline=SURFACE, width=2) + + img.save(output_path, format="PNG", optimize=True) + size_kb = os.path.getsize(output_path) // 1024 + print(f"Screenshot saved: {output_path} ({size_kb} KB)") + + +if __name__ == "__main__": + import sys + out_dir = os.path.dirname(os.path.abspath(__file__)) + + gif_path = os.path.join(out_dir, "demo.gif") + png_path = os.path.join(out_dir, "screenshot.png") + + render_screenshot(png_path) + render_gif(gif_path) + print("\nFiles created:") + print(f" {png_path}") + print(f" {gif_path}") diff --git a/nano-claude-code/nano_claude.py b/nano-claude-code/nano_claude.py new file mode 100644 index 0000000..8294b4a --- /dev/null +++ b/nano-claude-code/nano_claude.py @@ -0,0 +1,576 @@ +#!/usr/bin/env python3 +""" +Nano Claude Code — Minimal Python implementation of Claude Code. + +Usage: + python nano_claude.py [options] [prompt] + +Options: + -p, --print Non-interactive: run prompt and exit (also --print-output) + -m, --model MODEL Override model + --accept-all Never ask permission (dangerous) + --verbose Show thinking + token counts + --version Print version and exit + +Slash commands in REPL: + /help Show this help + /clear Clear conversation + /model [m] Show or set model + /config Show config / set key=value + /save [f] Save session to file + /load [f] Load session from file + /history Print conversation history + /context Show context window usage + /cost Show API cost this session + /verbose Toggle verbose mode + /thinking Toggle extended thinking + /permissions [mode] Set permission mode + /cwd [path] Show or change working directory + /exit /quit Exit +""" +from __future__ import annotations + +import os +import sys +import json +import readline +import atexit +import argparse +import textwrap +from pathlib import Path +from datetime import datetime +from typing import Optional + +# ── Optional rich for markdown rendering ────────────────────────────────── +try: + from rich.console import Console + from rich.markdown import Markdown + from rich.syntax import Syntax + from rich.panel import Panel + from rich import print as rprint + _RICH = True + console = Console() +except ImportError: + _RICH = False + console = None + +VERSION = "1.0.0" + +# ── ANSI helpers (used even with rich for non-markdown output) ───────────── +C = { + "cyan": "\033[36m", + "green": "\033[32m", + "yellow": "\033[33m", + "red": "\033[31m", + "blue": "\033[34m", + "magenta": "\033[35m", + "bold": "\033[1m", + "dim": "\033[2m", + "reset": "\033[0m", +} + +def clr(text: str, *keys: str) -> str: + return "".join(C[k] for k in keys) + str(text) + C["reset"] + +def info(msg: str): print(clr(msg, "cyan")) +def ok(msg: str): print(clr(msg, "green")) +def warn(msg: str): print(clr(f"Warning: {msg}", "yellow")) +def err(msg: str): print(clr(f"Error: {msg}", "red"), file=sys.stderr) + + +# ── Conversation rendering ───────────────────────────────────────────────── + +_accumulated_text: list[str] = [] # buffer text during streaming + +def stream_text(chunk: str): + """Called for each streamed text chunk.""" + print(chunk, end="", flush=True) + _accumulated_text.append(chunk) + +def stream_thinking(chunk: str, verbose: bool): + if verbose: + print(clr(chunk, "dim"), end="", flush=True) + +def flush_response(): + """After streaming, optionally re-render as markdown.""" + full = "".join(_accumulated_text) + _accumulated_text.clear() + if _RICH and full.strip(): + # Re-print with markdown rendering + print("\r", end="") # go to line start to overwrite last newline + # only re-render if there's actual markdown (contains # * ` _ etc.) + if any(c in full for c in ("#", "*", "`", "_", "[")): + print() # newline after streaming + console.print(Markdown(full)) + return + print() # ensure newline after stream + +def print_tool_start(name: str, inputs: dict, verbose: bool): + """Show tool invocation.""" + desc = _tool_desc(name, inputs) + print(clr(f"\n ⚙ {desc}", "dim", "cyan"), flush=True) + if verbose: + print(clr(f" inputs: {json.dumps(inputs, ensure_ascii=False)[:200]}", "dim")) + +def print_tool_end(name: str, result: str, verbose: bool): + lines = result.count("\n") + 1 + size = len(result) + summary = f"→ {lines} lines ({size} chars)" + if not result.startswith("Error") and not result.startswith("Denied"): + print(clr(f" ✓ {summary}", "dim", "green"), flush=True) + else: + print(clr(f" ✗ {result[:120]}", "dim", "red"), flush=True) + if verbose and not result.startswith("Denied"): + preview = result[:500] + ("…" if len(result) > 500 else "") + print(clr(f" {preview.replace(chr(10), chr(10)+' ')}", "dim")) + +def _tool_desc(name: str, inputs: dict) -> str: + if name == "Read": return f"Read({inputs.get('file_path','')})" + if name == "Write": return f"Write({inputs.get('file_path','')})" + if name == "Edit": return f"Edit({inputs.get('file_path','')})" + if name == "Bash": return f"Bash({inputs.get('command','')[:80]})" + if name == "Glob": return f"Glob({inputs.get('pattern','')})" + if name == "Grep": return f"Grep({inputs.get('pattern','')})" + if name == "WebFetch": return f"WebFetch({inputs.get('url','')[:60]})" + if name == "WebSearch": return f"WebSearch({inputs.get('query','')})" + return f"{name}({list(inputs.values())[:1]})" + + +# ── Permission prompt ────────────────────────────────────────────────────── + +def ask_permission_interactive(desc: str, config: dict) -> bool: + try: + print() + ans = input(clr(f" Allow: {desc} [y/N/a(ccept-all)] ", "yellow")).strip().lower() + if ans == "a": + config["permission_mode"] = "accept-all" + ok(" Permission mode set to accept-all for this session.") + return True + return ans in ("y", "yes") + except (KeyboardInterrupt, EOFError): + print() + return False + + +# ── Slash commands ───────────────────────────────────────────────────────── + +def cmd_help(_args: str, _state, _config) -> bool: + print(__doc__) + return True + +def cmd_clear(_args: str, state, _config) -> bool: + state.messages.clear() + state.turn_count = 0 + ok("Conversation cleared.") + return True + +def cmd_model(args: str, _state, config) -> bool: + from providers import PROVIDERS, detect_provider + if not args: + model = config["model"] + pname = detect_provider(model) + info(f"Current model: {model} (provider: {pname})") + info("\nAvailable models by provider:") + for pn, pdata in PROVIDERS.items(): + ms = pdata.get("models", []) + if ms: + info(f" {pn:12s} " + ", ".join(ms[:4]) + ("..." if len(ms) > 4 else "")) + info("\nFormat: 'provider/model' or just model name (auto-detected)") + info(" e.g. /model gpt-4o") + info(" e.g. /model ollama/qwen2.5-coder") + info(" e.g. /model kimi:moonshot-v1-32k") + else: + # Accept both "ollama/model" and "ollama:model" syntax + m = args.strip().replace(":", "/", 1) + config["model"] = m + pname = detect_provider(m) + ok(f"Model set to {m} (provider: {pname})") + from config import save_config + save_config(config) + return True + +def cmd_config(args: str, _state, config) -> bool: + from config import save_config + if not args: + display = {k: v for k, v in config.items() if k != "api_key"} + print(json.dumps(display, indent=2)) + elif "=" in args: + key, _, val = args.partition("=") + key, val = key.strip(), val.strip() + # Type coercion + if val.lower() in ("true", "false"): + val = val.lower() == "true" + elif val.isdigit(): + val = int(val) + config[key] = val + save_config(config) + ok(f"Set {key} = {val}") + else: + k = args.strip() + v = config.get(k, "(not set)") + info(f"{k} = {v}") + return True + +def cmd_save(args: str, state, _config) -> bool: + from config import SESSIONS_DIR + fname = args.strip() or f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + path = Path(fname) if "/" in fname else SESSIONS_DIR / fname + data = { + "messages": [ + m if not isinstance(m.get("content"), list) else + {**m, "content": [ + b if isinstance(b, dict) else b.model_dump() + for b in m["content"] + ]} + for m in state.messages + ], + "turn_count": state.turn_count, + "total_input_tokens": state.total_input_tokens, + "total_output_tokens": state.total_output_tokens, + } + path.write_text(json.dumps(data, indent=2, default=str)) + ok(f"Session saved to {path}") + return True + +def cmd_load(args: str, state, _config) -> bool: + from config import SESSIONS_DIR + if not args.strip(): + # List available sessions + sessions = sorted(SESSIONS_DIR.glob("*.json")) + if not sessions: + info("No saved sessions found.") + else: + info("Saved sessions:") + for s in sessions: + info(f" {s.name}") + return True + fname = args.strip() + path = Path(fname) if "/" in fname else SESSIONS_DIR / fname + if not path.exists(): + err(f"File not found: {path}") + return True + data = json.loads(path.read_text()) + state.messages = data.get("messages", []) + state.turn_count = data.get("turn_count", 0) + state.total_input_tokens = data.get("total_input_tokens", 0) + state.total_output_tokens = data.get("total_output_tokens", 0) + ok(f"Session loaded from {path} ({len(state.messages)} messages)") + return True + +def cmd_history(_args: str, state, _config) -> bool: + if not state.messages: + info("(empty conversation)") + return True + for i, m in enumerate(state.messages): + role = clr(m["role"].upper(), "bold", + "cyan" if m["role"] == "user" else "green") + content = m["content"] + if isinstance(content, str): + print(f"[{i}] {role}: {content[:200]}") + elif isinstance(content, list): + for block in content: + if isinstance(block, dict): + btype = block.get("type", "") + else: + btype = getattr(block, "type", "") + if btype == "text": + text = block.get("text", "") if isinstance(block, dict) else block.text + print(f"[{i}] {role}: {text[:200]}") + elif btype == "tool_use": + name = block.get("name", "") if isinstance(block, dict) else block.name + print(f"[{i}] {role}: [tool_use: {name}]") + elif btype == "tool_result": + cval = block.get("content", "") if isinstance(block, dict) else block.content + print(f"[{i}] {role}: [tool_result: {str(cval)[:100]}]") + return True + +def cmd_context(_args: str, state, config) -> bool: + import anthropic + # Rough token estimate: 4 chars ≈ 1 token + msg_chars = sum( + len(str(m.get("content", ""))) for m in state.messages + ) + est_tokens = msg_chars // 4 + info(f"Messages: {len(state.messages)}") + info(f"Estimated tokens: ~{est_tokens:,}") + info(f"Model: {config['model']}") + info(f"Max tokens: {config['max_tokens']:,}") + return True + +def cmd_cost(_args: str, state, config) -> bool: + from config import calc_cost + cost = calc_cost(config["model"], + state.total_input_tokens, + state.total_output_tokens) + info(f"Input tokens: {state.total_input_tokens:,}") + info(f"Output tokens: {state.total_output_tokens:,}") + info(f"Est. cost: ${cost:.4f} USD") + return True + +def cmd_verbose(_args: str, _state, config) -> bool: + config["verbose"] = not config.get("verbose", False) + state_str = "ON" if config["verbose"] else "OFF" + ok(f"Verbose mode: {state_str}") + return True + +def cmd_thinking(_args: str, _state, config) -> bool: + config["thinking"] = not config.get("thinking", False) + state_str = "ON" if config["thinking"] else "OFF" + ok(f"Extended thinking: {state_str}") + return True + +def cmd_permissions(args: str, _state, config) -> bool: + from config import save_config + modes = ["auto", "accept-all", "manual"] + if not args.strip(): + info(f"Permission mode: {config.get('permission_mode','auto')}") + info(f"Available modes: {', '.join(modes)}") + else: + m = args.strip() + if m not in modes: + err(f"Unknown mode: {m}. Choose: {', '.join(modes)}") + else: + config["permission_mode"] = m + save_config(config) + ok(f"Permission mode set to: {m}") + return True + +def cmd_cwd(args: str, _state, _config) -> bool: + if not args.strip(): + info(f"Working directory: {os.getcwd()}") + else: + p = args.strip() + try: + os.chdir(p) + ok(f"Changed directory to: {os.getcwd()}") + except Exception as e: + err(str(e)) + return True + +def cmd_exit(_args: str, _state, _config) -> bool: + ok("Goodbye!") + sys.exit(0) + +COMMANDS = { + "help": cmd_help, + "clear": cmd_clear, + "model": cmd_model, + "config": cmd_config, + "save": cmd_save, + "load": cmd_load, + "history": cmd_history, + "context": cmd_context, + "cost": cmd_cost, + "verbose": cmd_verbose, + "thinking": cmd_thinking, + "permissions": cmd_permissions, + "cwd": cmd_cwd, + "exit": cmd_exit, + "quit": cmd_exit, +} + + +def handle_slash(line: str, state, config) -> bool: + """Handle /command [args]. Returns True if handled.""" + if not line.startswith("/"): + return False + parts = line[1:].split(None, 1) + if not parts: + return False + cmd = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + handler = COMMANDS.get(cmd) + if handler: + handler(args, state, config) + return True + err(f"Unknown command: /{cmd} (type /help for commands)") + return True + + +# ── Input history setup ──────────────────────────────────────────────────── + +def setup_readline(history_file: Path): + try: + readline.read_history_file(str(history_file)) + except FileNotFoundError: + pass + readline.set_history_length(1000) + atexit.register(readline.write_history_file, str(history_file)) + + # Tab-complete slash commands + commands = [f"/{c}" for c in COMMANDS] + def completer(text: str, state: int): + matches = [c for c in commands if c.startswith(text)] + return matches[state] if state < len(matches) else None + readline.set_completer(completer) + readline.parse_and_bind("tab: complete") + + +# ── Main REPL ────────────────────────────────────────────────────────────── + +def repl(config: dict, initial_prompt: str = None): + from config import HISTORY_FILE + from context import build_system_prompt + from agent import AgentState, run, TextChunk, ThinkingChunk, ToolStart, ToolEnd, TurnDone, PermissionRequest + + setup_readline(HISTORY_FILE) + state = AgentState() + verbose = config.get("verbose", False) + + # Banner + if not initial_prompt: + from providers import detect_provider + model = config["model"] + pname = detect_provider(model) + model_clr = clr(model, "cyan", "bold") + prov_clr = clr(f"({pname})", "dim") + pmode = clr(config.get("permission_mode", "auto"), "yellow") + print(clr("╭─ Nano Claude Code ──────────────────────────────╮", "dim")) + print(clr("│ Model: ", "dim") + model_clr + " " + prov_clr) + print(clr("│ Permissions: ", "dim") + pmode) + print(clr("│ /model to switch provider · /help for commands │", "dim")) + print(clr("╰──────────────────────────────────────────────────╯", "dim")) + print() + + def run_query(user_input: str): + nonlocal verbose + verbose = config.get("verbose", False) + + # Rebuild system prompt each turn (picks up cwd changes, etc.) + system_prompt = build_system_prompt() + + print(clr("\n╭─ Claude ", "dim") + clr("●", "green") + clr(" ─────────────────────────", "dim")) + print(clr("│ ", "dim"), end="", flush=True) + + thinking_started = False + + for event in run(user_input, state, config, system_prompt): + if isinstance(event, TextChunk): + stream_text(event.text) + + elif isinstance(event, ThinkingChunk): + if verbose: + if not thinking_started: + print(clr("\n [thinking]", "dim")) + thinking_started = True + stream_thinking(event.text, verbose) + + elif isinstance(event, ToolStart): + flush_response() + print_tool_start(event.name, event.inputs, verbose) + + elif isinstance(event, PermissionRequest): + event.granted = ask_permission_interactive(event.description, config) + + elif isinstance(event, ToolEnd): + print_tool_end(event.name, event.result, verbose) + # Print prefix for next text + print(clr("│ ", "dim"), end="", flush=True) + + elif isinstance(event, TurnDone): + if verbose: + print(clr( + f"\n [tokens: +{event.input_tokens} in / " + f"+{event.output_tokens} out]", "dim" + )) + + flush_response() + print(clr("╰──────────────────────────────────────────────", "dim")) + print() + + # ── Main loop ── + if initial_prompt: + try: + run_query(initial_prompt) + except KeyboardInterrupt: + print() + return + + while True: + try: + cwd_short = Path.cwd().name + prompt = clr(f"\n[{cwd_short}] ", "dim") + clr("❯ ", "cyan", "bold") + user_input = input(prompt).strip() + except (EOFError, KeyboardInterrupt): + print() + ok("Goodbye!") + sys.exit(0) + + if not user_input: + continue + if handle_slash(user_input, state, config): + continue + + try: + run_query(user_input) + except KeyboardInterrupt: + print(clr("\n (interrupted)", "yellow")) + # Keep conversation history up to the interruption + + +# ── Entry point ──────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + prog="nano_claude", + description="Nano Claude Code — minimal Python Claude Code implementation", + add_help=False, + ) + parser.add_argument("prompt", nargs="*", help="Initial prompt (non-interactive)") + parser.add_argument("-p", "--print", "--print-output", + dest="print_mode", action="store_true", + help="Non-interactive mode: run prompt and exit") + parser.add_argument("-m", "--model", help="Override model") + parser.add_argument("--accept-all", action="store_true", + help="Never ask permission (accept all operations)") + parser.add_argument("--verbose", action="store_true", + help="Show thinking + token counts") + parser.add_argument("--thinking", action="store_true", + help="Enable extended thinking") + parser.add_argument("--version", action="store_true", help="Print version") + parser.add_argument("-h", "--help", action="store_true", help="Show help") + + args = parser.parse_args() + + if args.version: + print(f"nano claude code v{VERSION}") + sys.exit(0) + + if args.help: + print(__doc__) + sys.exit(0) + + from config import load_config, save_config, has_api_key + from providers import detect_provider, PROVIDERS + + config = load_config() + + # Apply CLI overrides first (so key check uses the right provider) + if args.model: + config["model"] = args.model.replace(":", "/", 1) + if args.accept_all: + config["permission_mode"] = "accept-all" + if args.verbose: + config["verbose"] = True + if args.thinking: + config["thinking"] = True + + # Check API key for active provider (warn only, don't block local providers) + if not has_api_key(config): + pname = detect_provider(config["model"]) + prov = PROVIDERS.get(pname, {}) + env = prov.get("api_key_env", "") + if env: # local providers like ollama have no env key requirement + warn(f"No API key found for provider '{pname}'. " + f"Set {env} or run: /config {pname}_api_key=YOUR_KEY") + + initial = " ".join(args.prompt) if args.prompt else None + if args.print_mode and not initial: + err("--print requires a prompt argument") + sys.exit(1) + + repl(config, initial_prompt=initial) + + +if __name__ == "__main__": + main() diff --git a/nano-claude-code/providers.py b/nano-claude-code/providers.py new file mode 100644 index 0000000..1fb4f9a --- /dev/null +++ b/nano-claude-code/providers.py @@ -0,0 +1,487 @@ +""" +Multi-provider support for nano claude. + +Supported providers: + anthropic — Claude (claude-opus-4-6, claude-sonnet-4-6, ...) + openai — GPT (gpt-4o, o3-mini, ...) + gemini — Google Gemini (gemini-2.0-flash, gemini-1.5-pro, ...) + kimi — Moonshot AI (moonshot-v1-8k/32k/128k) + qwen — Alibaba DashScope (qwen-max, qwen-plus, ...) + zhipu — Zhipu GLM (glm-4, glm-4-plus, ...) + deepseek — DeepSeek (deepseek-chat, deepseek-reasoner, ...) + ollama — Local Ollama (llama3.3, qwen2.5-coder, ...) + lmstudio — Local LM Studio (any loaded model) + custom — Any OpenAI-compatible endpoint + +Model string formats: + "claude-opus-4-6" auto-detected → anthropic + "gpt-4o" auto-detected → openai + "ollama/qwen2.5-coder" explicit provider prefix + "custom/my-model" uses CUSTOM_BASE_URL from config +""" +from __future__ import annotations +import json +from typing import Generator + +# ── Provider registry ────────────────────────────────────────────────────── + +PROVIDERS: dict[str, dict] = { + "anthropic": { + "type": "anthropic", + "api_key_env": "ANTHROPIC_API_KEY", + "models": [ + "claude-opus-4-6", "claude-sonnet-4-6", "claude-haiku-4-5-20251001", + "claude-opus-4-5", "claude-sonnet-4-5", + "claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022", + ], + }, + "openai": { + "type": "openai", + "api_key_env": "OPENAI_API_KEY", + "base_url": "https://api.openai.com/v1", + "models": [ + "gpt-4o", "gpt-4o-mini", "gpt-4-turbo", + "o3-mini", "o1", "o1-mini", + ], + }, + "gemini": { + "type": "openai", + "api_key_env": "GEMINI_API_KEY", + "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", + "models": [ + "gemini-2.5-pro-preview-03-25", + "gemini-2.0-flash", "gemini-2.0-flash-lite", + "gemini-1.5-pro", "gemini-1.5-flash", + ], + }, + "kimi": { + "type": "openai", + "api_key_env": "MOONSHOT_API_KEY", + "base_url": "https://api.moonshot.cn/v1", + "models": [ + "moonshot-v1-8k", "moonshot-v1-32k", "moonshot-v1-128k", + "kimi-latest", + ], + }, + "qwen": { + "type": "openai", + "api_key_env": "DASHSCOPE_API_KEY", + "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", + "models": [ + "qwen-max", "qwen-plus", "qwen-turbo", "qwen-long", + "qwen2.5-72b-instruct", "qwen2.5-coder-32b-instruct", + "qwq-32b", + ], + }, + "zhipu": { + "type": "openai", + "api_key_env": "ZHIPU_API_KEY", + "base_url": "https://open.bigmodel.cn/api/paas/v4/", + "models": [ + "glm-4-plus", "glm-4", "glm-4-flash", "glm-4-air", + "glm-z1-flash", + ], + }, + "deepseek": { + "type": "openai", + "api_key_env": "DEEPSEEK_API_KEY", + "base_url": "https://api.deepseek.com/v1", + "models": [ + "deepseek-chat", "deepseek-coder", "deepseek-reasoner", + ], + }, + "ollama": { + "type": "openai", + "api_key_env": None, + "base_url": "http://localhost:11434/v1", + "api_key": "ollama", + "models": [ + "llama3.3", "llama3.2", "phi4", "mistral", "mixtral", + "qwen2.5-coder", "deepseek-r1", "gemma3", + ], + }, + "lmstudio": { + "type": "openai", + "api_key_env": None, + "base_url": "http://localhost:1234/v1", + "api_key": "lm-studio", + "models": [], # dynamic, depends on loaded model + }, + "custom": { + "type": "openai", + "api_key_env": "CUSTOM_API_KEY", + "base_url": None, # read from config["custom_base_url"] + "models": [], + }, +} + +# Cost per million tokens (approximate, fallback to 0 for unknown) +COSTS = { + "claude-opus-4-6": (15.0, 75.0), + "claude-sonnet-4-6": (3.0, 15.0), + "claude-haiku-4-5-20251001": (0.8, 4.0), + "gpt-4o": (2.5, 10.0), + "gpt-4o-mini": (0.15, 0.6), + "o3-mini": (1.1, 4.4), + "gemini-2.0-flash": (0.075, 0.3), + "gemini-1.5-pro": (1.25, 5.0), + "gemini-2.5-pro-preview-03-25": (1.25, 10.0), + "moonshot-v1-8k": (1.0, 3.0), + "moonshot-v1-32k": (2.4, 7.0), + "moonshot-v1-128k": (8.0, 24.0), + "qwen-max": (2.4, 9.6), + "qwen-plus": (0.4, 1.2), + "deepseek-chat": (0.27, 1.1), + "deepseek-reasoner": (0.55, 2.19), + "glm-4-plus": (0.7, 0.7), +} + +# Auto-detection: prefix → provider name +_PREFIXES = [ + ("claude-", "anthropic"), + ("gpt-", "openai"), + ("o1", "openai"), + ("o3", "openai"), + ("gemini-", "gemini"), + ("moonshot-", "kimi"), + ("kimi-", "kimi"), + ("qwen", "qwen"), # qwen-max, qwen2.5-... + ("qwq-", "qwen"), + ("glm-", "zhipu"), + ("deepseek-", "deepseek"), + ("llama", "ollama"), + ("mistral", "ollama"), + ("phi", "ollama"), + ("gemma", "ollama"), +] + + +def detect_provider(model: str) -> str: + """Return provider name for a model string. + Supports 'provider/model' explicit format, or auto-detect by prefix.""" + if "/" in model: + return model.split("/", 1)[0] + for prefix, pname in _PREFIXES: + if model.lower().startswith(prefix): + return pname + return "openai" # fallback + + +def bare_model(model: str) -> str: + """Strip 'provider/' prefix if present.""" + return model.split("/", 1)[1] if "/" in model else model + + +def get_api_key(provider_name: str, config: dict) -> str: + prov = PROVIDERS.get(provider_name, {}) + # 1. Check config dict (e.g. config["kimi_api_key"]) + cfg_key = config.get(f"{provider_name}_api_key", "") + if cfg_key: + return cfg_key + # 2. Check env var + env_var = prov.get("api_key_env") + if env_var: + import os + return os.environ.get(env_var, "") + # 3. Hardcoded (for local providers) + return prov.get("api_key", "") + + +def calc_cost(model: str, in_tok: int, out_tok: int) -> float: + ic, oc = COSTS.get(bare_model(model), (0.0, 0.0)) + return (in_tok * ic + out_tok * oc) / 1_000_000 + + +# ── Tool schema conversion ───────────────────────────────────────────────── + +def tools_to_openai(tool_schemas: list) -> list: + """Convert Anthropic-style tool schemas to OpenAI function-calling format.""" + return [ + { + "type": "function", + "function": { + "name": t["name"], + "description": t["description"], + "parameters": t["input_schema"], + }, + } + for t in tool_schemas + ] + + +# ── Message format conversion ────────────────────────────────────────────── +# +# Internal "neutral" message format: +# {"role": "user", "content": "text"} +# {"role": "assistant", "content": "text", "tool_calls": [ +# {"id": "...", "name": "...", "input": {...}} +# ]} +# {"role": "tool", "tool_call_id": "...", "name": "...", "content": "..."} + +def messages_to_anthropic(messages: list) -> list: + """Convert neutral messages → Anthropic API format.""" + result = [] + i = 0 + while i < len(messages): + m = messages[i] + role = m["role"] + + if role == "user": + result.append({"role": "user", "content": m["content"]}) + i += 1 + + elif role == "assistant": + blocks = [] + text = m.get("content", "") + if text: + blocks.append({"type": "text", "text": text}) + for tc in m.get("tool_calls", []): + blocks.append({ + "type": "tool_use", + "id": tc["id"], + "name": tc["name"], + "input": tc["input"], + }) + result.append({"role": "assistant", "content": blocks}) + i += 1 + + elif role == "tool": + # Collect consecutive tool results into one user message + tool_blocks = [] + while i < len(messages) and messages[i]["role"] == "tool": + t = messages[i] + tool_blocks.append({ + "type": "tool_result", + "tool_use_id": t["tool_call_id"], + "content": t["content"], + }) + i += 1 + result.append({"role": "user", "content": tool_blocks}) + + else: + i += 1 + + return result + + +def messages_to_openai(messages: list) -> list: + """Convert neutral messages → OpenAI API format.""" + result = [] + for m in messages: + role = m["role"] + + if role == "user": + result.append({"role": "user", "content": m["content"]}) + + elif role == "assistant": + msg: dict = {"role": "assistant", "content": m.get("content") or None} + tcs = m.get("tool_calls", []) + if tcs: + msg["tool_calls"] = [ + { + "id": tc["id"], + "type": "function", + "function": { + "name": tc["name"], + "arguments": json.dumps(tc["input"], ensure_ascii=False), + }, + } + for tc in tcs + ] + result.append(msg) + + elif role == "tool": + result.append({ + "role": "tool", + "tool_call_id": m["tool_call_id"], + "content": m["content"], + }) + + return result + + +# ── Streaming adapters ───────────────────────────────────────────────────── + +class TextChunk: + def __init__(self, text): self.text = text + +class ThinkingChunk: + def __init__(self, text): self.text = text + +class AssistantTurn: + """Completed assistant turn with text + tool_calls.""" + def __init__(self, text, tool_calls, in_tokens, out_tokens): + self.text = text + self.tool_calls = tool_calls # list of {id, name, input} + self.in_tokens = in_tokens + self.out_tokens = out_tokens + + +def stream_anthropic( + api_key: str, + model: str, + system: str, + messages: list, + tool_schemas: list, + config: dict, +) -> Generator: + """Stream from Anthropic API. Yields TextChunk/ThinkingChunk, then AssistantTurn.""" + import anthropic as _ant + client = _ant.Anthropic(api_key=api_key) + + kwargs = { + "model": model, + "max_tokens": config.get("max_tokens", 8192), + "system": system, + "messages": messages_to_anthropic(messages), + "tools": tool_schemas, + } + if config.get("thinking"): + kwargs["thinking"] = { + "type": "enabled", + "budget_tokens": config.get("thinking_budget", 10000), + } + + tool_calls = [] + text = "" + + with client.messages.stream(**kwargs) as stream: + for event in stream: + etype = getattr(event, "type", None) + if etype == "content_block_delta": + delta = event.delta + dtype = getattr(delta, "type", None) + if dtype == "text_delta": + text += delta.text + yield TextChunk(delta.text) + elif dtype == "thinking_delta": + yield ThinkingChunk(delta.thinking) + + final = stream.get_final_message() + for block in final.content: + if block.type == "tool_use": + tool_calls.append({ + "id": block.id, + "name": block.name, + "input": block.input, + }) + + yield AssistantTurn( + text, tool_calls, + final.usage.input_tokens, + final.usage.output_tokens, + ) + + +def stream_openai_compat( + api_key: str, + base_url: str, + model: str, + system: str, + messages: list, + tool_schemas: list, + config: dict, +) -> Generator: + """Stream from any OpenAI-compatible API. Yields TextChunk, then AssistantTurn.""" + from openai import OpenAI + client = OpenAI(api_key=api_key or "dummy", base_url=base_url) + + oai_messages = [{"role": "system", "content": system}] + messages_to_openai(messages) + + kwargs: dict = { + "model": model, + "messages": oai_messages, + "stream": True, + } + if tool_schemas and not config.get("no_tools"): + kwargs["tools"] = tools_to_openai(tool_schemas) + # "auto" requires vLLM --enable-auto-tool-choice; omit if server doesn't support it + if not config.get("disable_tool_choice"): + kwargs["tool_choice"] = "auto" + if config.get("max_tokens"): + kwargs["max_tokens"] = config["max_tokens"] + + text = "" + tool_buf: dict = {} # index → {id, name, args_str} + in_tok = out_tok = 0 + + stream = client.chat.completions.create(**kwargs) + for chunk in stream: + if not chunk.choices: + # usage-only chunk (some providers send this last) + if hasattr(chunk, "usage") and chunk.usage: + in_tok = chunk.usage.prompt_tokens + out_tok = chunk.usage.completion_tokens + continue + + choice = chunk.choices[0] + delta = choice.delta + + if delta.content: + text += delta.content + yield TextChunk(delta.content) + + if delta.tool_calls: + for tc in delta.tool_calls: + idx = tc.index + if idx not in tool_buf: + tool_buf[idx] = {"id": "", "name": "", "args": ""} + if tc.id: + tool_buf[idx]["id"] = tc.id + if tc.function: + if tc.function.name: + tool_buf[idx]["name"] += tc.function.name + if tc.function.arguments: + tool_buf[idx]["args"] += tc.function.arguments + + # Some providers include usage in the last chunk + if hasattr(chunk, "usage") and chunk.usage: + in_tok = chunk.usage.prompt_tokens or in_tok + out_tok = chunk.usage.completion_tokens or out_tok + + tool_calls = [] + for idx in sorted(tool_buf): + v = tool_buf[idx] + try: + inp = json.loads(v["args"]) if v["args"] else {} + except json.JSONDecodeError: + inp = {"_raw": v["args"]} + tool_calls.append({"id": v["id"] or f"call_{idx}", "name": v["name"], "input": inp}) + + yield AssistantTurn(text, tool_calls, in_tok, out_tok) + + +def stream( + model: str, + system: str, + messages: list, + tool_schemas: list, + config: dict, +) -> Generator: + """ + Unified streaming entry point. + Auto-detects provider from model string. + Yields: TextChunk | ThinkingChunk | AssistantTurn + """ + provider_name = detect_provider(model) + model_name = bare_model(model) + prov = PROVIDERS.get(provider_name, PROVIDERS["openai"]) + api_key = get_api_key(provider_name, config) + + if prov["type"] == "anthropic": + yield from stream_anthropic(api_key, model_name, system, messages, tool_schemas, config) + else: + import os as _os + if provider_name == "custom": + base_url = (config.get("custom_base_url") + or _os.environ.get("CUSTOM_BASE_URL", "")) + if not base_url: + raise ValueError( + "custom provider requires a base_url. " + "Set CUSTOM_BASE_URL env var or run: /config custom_base_url=http://..." + ) + else: + base_url = prov.get("base_url", "https://api.openai.com/v1") + yield from stream_openai_compat( + api_key, base_url, model_name, system, messages, tool_schemas, config + ) diff --git a/nano-claude-code/requirements.txt b/nano-claude-code/requirements.txt new file mode 100644 index 0000000..e945798 --- /dev/null +++ b/nano-claude-code/requirements.txt @@ -0,0 +1,4 @@ +anthropic>=0.40.0 +openai>=1.30.0 +httpx>=0.27.0 +rich>=13.0.0 diff --git a/nano-claude-code/tools.py b/nano-claude-code/tools.py new file mode 100644 index 0000000..0efd516 --- /dev/null +++ b/nano-claude-code/tools.py @@ -0,0 +1,359 @@ +"""Tool definitions and implementations for nano claude.""" +import os +import re +import glob as _glob +import subprocess +from pathlib import Path +from typing import Callable, Optional + +# ── Tool JSON schemas (sent to Claude API) ───────────────────────────────── + +TOOL_SCHEMAS = [ + { + "name": "Read", + "description": ( + "Read a file's contents. Returns content with line numbers " + "(format: 'N\\tline'). Use limit/offset to read large files in chunks." + ), + "input_schema": { + "type": "object", + "properties": { + "file_path": {"type": "string", "description": "Absolute file path"}, + "limit": {"type": "integer", "description": "Max lines to read"}, + "offset": {"type": "integer", "description": "Start line (0-indexed)"}, + }, + "required": ["file_path"], + }, + }, + { + "name": "Write", + "description": "Write content to a file, creating parent directories as needed.", + "input_schema": { + "type": "object", + "properties": { + "file_path": {"type": "string"}, + "content": {"type": "string"}, + }, + "required": ["file_path", "content"], + }, + }, + { + "name": "Edit", + "description": ( + "Replace exact text in a file. old_string must match exactly (including whitespace). " + "If old_string appears multiple times, use replace_all=true or add more context." + ), + "input_schema": { + "type": "object", + "properties": { + "file_path": {"type": "string"}, + "old_string": {"type": "string", "description": "Exact text to replace"}, + "new_string": {"type": "string", "description": "Replacement text"}, + "replace_all": {"type": "boolean", "description": "Replace all occurrences"}, + }, + "required": ["file_path", "old_string", "new_string"], + }, + }, + { + "name": "Bash", + "description": "Execute a shell command. Returns stdout+stderr. Stateless (no cd persistence).", + "input_schema": { + "type": "object", + "properties": { + "command": {"type": "string"}, + "timeout": {"type": "integer", "description": "Seconds before timeout (default 30)"}, + }, + "required": ["command"], + }, + }, + { + "name": "Glob", + "description": "Find files matching a glob pattern. Returns sorted list of matching paths.", + "input_schema": { + "type": "object", + "properties": { + "pattern": {"type": "string", "description": "Glob pattern e.g. **/*.py"}, + "path": {"type": "string", "description": "Base directory (default: cwd)"}, + }, + "required": ["pattern"], + }, + }, + { + "name": "Grep", + "description": "Search file contents with regex using ripgrep (falls back to grep).", + "input_schema": { + "type": "object", + "properties": { + "pattern": {"type": "string", "description": "Regex pattern"}, + "path": {"type": "string", "description": "File or directory to search"}, + "glob": {"type": "string", "description": "File filter e.g. *.py"}, + "output_mode": { + "type": "string", + "enum": ["content", "files_with_matches", "count"], + "description": "content=matching lines, files_with_matches=file paths, count=match counts", + }, + "case_insensitive": {"type": "boolean"}, + "context": {"type": "integer", "description": "Lines of context around matches"}, + }, + "required": ["pattern"], + }, + }, + { + "name": "WebFetch", + "description": "Fetch a URL and return its text content (HTML stripped).", + "input_schema": { + "type": "object", + "properties": { + "url": {"type": "string"}, + "prompt": {"type": "string", "description": "Hint for what to extract"}, + }, + "required": ["url"], + }, + }, + { + "name": "WebSearch", + "description": "Search the web via DuckDuckGo and return top results.", + "input_schema": { + "type": "object", + "properties": { + "query": {"type": "string"}, + }, + "required": ["query"], + }, + }, +] + +# ── Safe bash commands (never ask permission) ─────────────────────────────── + +_SAFE_PREFIXES = ( + "ls", "cat", "head", "tail", "wc", "pwd", "echo", "printf", "date", + "which", "type", "env", "printenv", "uname", "whoami", "id", + "git log", "git status", "git diff", "git show", "git branch", + "git remote", "git stash list", "git tag", + "find ", "grep ", "rg ", "ag ", "fd ", + "python ", "python3 ", "node ", "ruby ", "perl ", + "pip show", "pip list", "npm list", "cargo metadata", + "df ", "du ", "free ", "top -bn", "ps ", + "curl -I", "curl --head", +) + +def _is_safe_bash(cmd: str) -> bool: + c = cmd.strip() + return any(c.startswith(p) for p in _SAFE_PREFIXES) + + +# ── Tool implementations ─────────────────────────────────────────────────── + +def _read(file_path: str, limit: int = None, offset: int = None) -> str: + p = Path(file_path) + if not p.exists(): + return f"Error: file not found: {file_path}" + if p.is_dir(): + return f"Error: {file_path} is a directory" + try: + lines = p.read_text(errors="replace").splitlines(keepends=True) + start = offset or 0 + chunk = lines[start:start + limit] if limit else lines[start:] + if not chunk: + return "(empty file)" + return "".join(f"{start + i + 1}\t{l}" for i, l in enumerate(chunk)) + except Exception as e: + return f"Error: {e}" + + +def _write(file_path: str, content: str) -> str: + p = Path(file_path) + try: + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(content) + lc = content.count("\n") + (1 if content and not content.endswith("\n") else 0) + return f"Wrote {lc} lines to {file_path}" + except Exception as e: + return f"Error: {e}" + + +def _edit(file_path: str, old_string: str, new_string: str, replace_all: bool = False) -> str: + p = Path(file_path) + if not p.exists(): + return f"Error: file not found: {file_path}" + try: + content = p.read_text() + count = content.count(old_string) + if count == 0: + return "Error: old_string not found in file" + if count > 1 and not replace_all: + return (f"Error: old_string appears {count} times. " + "Provide more context to make it unique, or use replace_all=true.") + new_content = content.replace(old_string, new_string) if replace_all else \ + content.replace(old_string, new_string, 1) + p.write_text(new_content) + return f"Replaced {'all ' + str(count) if replace_all else '1'} occurrence(s) in {file_path}" + except Exception as e: + return f"Error: {e}" + + +def _bash(command: str, timeout: int = 30) -> str: + try: + r = subprocess.run( + command, shell=True, capture_output=True, text=True, + timeout=timeout, cwd=os.getcwd(), + ) + out = r.stdout + if r.stderr: + out += ("\n" if out else "") + "[stderr]\n" + r.stderr + return out.strip() or "(no output)" + except subprocess.TimeoutExpired: + return f"Error: timed out after {timeout}s" + except Exception as e: + return f"Error: {e}" + + +def _glob(pattern: str, path: str = None) -> str: + base = Path(path) if path else Path.cwd() + try: + matches = sorted(base.glob(pattern)) + if not matches: + return "No files matched" + return "\n".join(str(m) for m in matches[:500]) + except Exception as e: + return f"Error: {e}" + + +def _has_rg() -> bool: + try: + subprocess.run(["rg", "--version"], capture_output=True, check=True) + return True + except Exception: + return False + + +def _grep(pattern: str, path: str = None, glob: str = None, + output_mode: str = "files_with_matches", + case_insensitive: bool = False, context: int = 0) -> str: + use_rg = _has_rg() + cmd = ["rg" if use_rg else "grep", "--no-heading"] + if case_insensitive: + cmd.append("-i") + if output_mode == "files_with_matches": + cmd.append("-l") + elif output_mode == "count": + cmd.append("-c") + else: + cmd.append("-n") + if context: + cmd += ["-C", str(context)] + if glob: + cmd += (["--glob", glob] if use_rg else ["--include", glob]) + cmd.append(pattern) + cmd.append(path or str(Path.cwd())) + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + out = r.stdout.strip() + return out[:20000] if out else "No matches found" + except Exception as e: + return f"Error: {e}" + + +def _webfetch(url: str, prompt: str = None) -> str: + try: + import httpx + r = httpx.get(url, headers={"User-Agent": "NanoClaude/1.0"}, + timeout=30, follow_redirects=True) + r.raise_for_status() + ct = r.headers.get("content-type", "") + if "html" in ct: + text = re.sub(r"]*>.*?", "", r.text, + flags=re.DOTALL | re.IGNORECASE) + text = re.sub(r"]*>.*?", "", text, + flags=re.DOTALL | re.IGNORECASE) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text).strip() + else: + text = r.text + return text[:25000] + except ImportError: + return "Error: httpx not installed — run: pip install httpx" + except Exception as e: + return f"Error: {e}" + + +def _websearch(query: str) -> str: + try: + import httpx + url = "https://html.duckduckgo.com/html/" + r = httpx.get(url, params={"q": query}, + headers={"User-Agent": "Mozilla/5.0 (compatible)"}, + timeout=30, follow_redirects=True) + titles = re.findall(r'class="result__title"[^>]*>.*?]*href="([^"]+)"[^>]*>(.*?)', + r.text, re.DOTALL) + snippets = re.findall(r'class="result__snippet"[^>]*>(.*?)', r.text, re.DOTALL) + results = [] + for i, (link, title) in enumerate(titles[:8]): + t = re.sub(r"<[^>]+>", "", title).strip() + s = re.sub(r"<[^>]+>", "", snippets[i]).strip() if i < len(snippets) else "" + results.append(f"**{t}**\n{link}\n{s}") + return "\n\n".join(results) if results else "No results found" + except ImportError: + return "Error: httpx not installed — run: pip install httpx" + except Exception as e: + return f"Error: {e}" + + +# ── Dispatcher ───────────────────────────────────────────────────────────── + +def execute_tool( + name: str, + inputs: dict, + permission_mode: str = "auto", + ask_permission: Optional[Callable[[str], bool]] = None, +) -> str: + """Dispatch tool execution; ask permission for write/destructive ops.""" + + def _check(desc: str) -> bool: + """Return True if action is allowed.""" + if permission_mode == "accept-all": + return True + if ask_permission: + return ask_permission(desc) + return True # headless: allow everything + + if name == "Read": + return _read(inputs["file_path"], inputs.get("limit"), inputs.get("offset")) + + elif name == "Write": + if not _check(f"Write to {inputs['file_path']}"): + return "Denied: user rejected write operation" + return _write(inputs["file_path"], inputs["content"]) + + elif name == "Edit": + if not _check(f"Edit {inputs['file_path']}"): + return "Denied: user rejected edit operation" + return _edit(inputs["file_path"], inputs["old_string"], + inputs["new_string"], inputs.get("replace_all", False)) + + elif name == "Bash": + cmd = inputs["command"] + if permission_mode != "accept-all" and not _is_safe_bash(cmd): + if not _check(f"Bash: {cmd}"): + return "Denied: user rejected bash command" + return _bash(cmd, inputs.get("timeout", 30)) + + elif name == "Glob": + return _glob(inputs["pattern"], inputs.get("path")) + + elif name == "Grep": + return _grep( + inputs["pattern"], inputs.get("path"), inputs.get("glob"), + inputs.get("output_mode", "files_with_matches"), + inputs.get("case_insensitive", False), + inputs.get("context", 0), + ) + + elif name == "WebFetch": + return _webfetch(inputs["url"], inputs.get("prompt")) + + elif name == "WebSearch": + return _websearch(inputs["query"]) + + else: + return f"Unknown tool: {name}"