Add files via upload

This commit is contained in:
Shangding Gu
2026-04-01 15:19:29 -07:00
committed by GitHub
parent 3c58834a6a
commit 25aae479a5
8 changed files with 2103 additions and 0 deletions

View File

@@ -0,0 +1,69 @@
"""Configuration management for nano claude (multi-provider)."""
import os
import json
from pathlib import Path
CONFIG_DIR = Path.home() / ".nano_claude"
CONFIG_FILE = CONFIG_DIR / "config.json"
HISTORY_FILE = CONFIG_DIR / "input_history.txt"
SESSIONS_DIR = CONFIG_DIR / "sessions"
DEFAULTS = {
"model": "claude-opus-4-6",
"max_tokens": 8192,
"permission_mode": "auto", # auto | accept-all | manual
"verbose": False,
"thinking": False,
"thinking_budget": 10000,
"custom_base_url": "", # for "custom" provider
# Per-provider API keys (optional; env vars take priority)
# "anthropic_api_key": "sk-ant-..."
# "openai_api_key": "sk-..."
# "gemini_api_key": "..."
# "kimi_api_key": "..."
# "qwen_api_key": "..."
# "zhipu_api_key": "..."
# "deepseek_api_key": "..."
}
def load_config() -> dict:
CONFIG_DIR.mkdir(exist_ok=True)
SESSIONS_DIR.mkdir(exist_ok=True)
cfg = dict(DEFAULTS)
if CONFIG_FILE.exists():
try:
cfg.update(json.loads(CONFIG_FILE.read_text()))
except Exception:
pass
# Backward-compat: legacy single api_key → anthropic_api_key
if cfg.get("api_key") and not cfg.get("anthropic_api_key"):
cfg["anthropic_api_key"] = cfg.pop("api_key")
# Also accept ANTHROPIC_API_KEY env for backward-compat
if not cfg.get("anthropic_api_key"):
cfg["anthropic_api_key"] = os.environ.get("ANTHROPIC_API_KEY", "")
return cfg
def save_config(cfg: dict):
CONFIG_DIR.mkdir(exist_ok=True)
data = dict(cfg)
CONFIG_FILE.write_text(json.dumps(data, indent=2))
def current_provider(cfg: dict) -> str:
from providers import detect_provider
return detect_provider(cfg.get("model", "claude-opus-4-6"))
def has_api_key(cfg: dict) -> bool:
"""Check whether the active provider has an API key configured."""
from providers import get_api_key
pname = current_provider(cfg)
key = get_api_key(pname, cfg)
return bool(key)
def calc_cost(model: str, in_tokens: int, out_tokens: int) -> float:
from providers import calc_cost as _cc
return _cc(model, in_tokens, out_tokens)

100
nano-claude-code/context.py Normal file
View File

@@ -0,0 +1,100 @@
"""System context: CLAUDE.md, git info, cwd injection."""
import os
import subprocess
from pathlib import Path
from datetime import datetime
SYSTEM_PROMPT_TEMPLATE = """\
You are Nano Claude Code, Created by SAIL Lab (Safe AI and Robot Learning Lab), an AI coding assistant running in the terminal.
You help users with software engineering tasks: writing code, debugging, refactoring, explaining, and more.
# Available Tools
- **Read**: Read file contents with line numbers
- **Write**: Create or overwrite files
- **Edit**: Replace text in a file (exact string replacement)
- **Bash**: Execute shell commands
- **Glob**: Find files by pattern (e.g. **/*.py)
- **Grep**: Search file contents with regex
- **WebFetch**: Fetch and extract content from a URL
- **WebSearch**: Search the web via DuckDuckGo
# Guidelines
- Be concise and direct. Lead with the answer.
- Prefer editing existing files over creating new ones.
- Do not add unnecessary comments, docstrings, or error handling.
- When reading files before editing, use line numbers to be precise.
- Always use absolute paths for file operations.
- For multi-step tasks, work through them systematically.
- If a task is unclear, ask for clarification before proceeding.
# Environment
- Current date: {date}
- Working directory: {cwd}
- Platform: {platform}
{git_info}{claude_md}"""
def get_git_info() -> str:
"""Return git branch/status summary if in a git repo."""
try:
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL, text=True).strip()
status = subprocess.check_output(
["git", "status", "--short"],
stderr=subprocess.DEVNULL, text=True).strip()
log = subprocess.check_output(
["git", "log", "--oneline", "-5"],
stderr=subprocess.DEVNULL, text=True).strip()
parts = [f"- Git branch: {branch}"]
if status:
lines = status.split('\n')[:10]
parts.append("- Git status:\n" + "\n".join(f" {l}" for l in lines))
if log:
parts.append("- Recent commits:\n" + "\n".join(f" {l}" for l in log.split('\n')))
return "\n".join(parts) + "\n"
except Exception:
return ""
def get_claude_md() -> str:
"""Load CLAUDE.md from cwd or parents, and ~/.claude/CLAUDE.md."""
content_parts = []
# Global CLAUDE.md
global_md = Path.home() / ".claude" / "CLAUDE.md"
if global_md.exists():
try:
content_parts.append(f"[Global CLAUDE.md]\n{global_md.read_text()}")
except Exception:
pass
# Project CLAUDE.md (walk up from cwd)
p = Path.cwd()
for _ in range(10):
candidate = p / "CLAUDE.md"
if candidate.exists():
try:
content_parts.append(f"[Project CLAUDE.md: {candidate}]\n{candidate.read_text()}")
except Exception:
pass
break
parent = p.parent
if parent == p:
break
p = parent
if not content_parts:
return ""
return "\n# Memory / CLAUDE.md\n" + "\n\n".join(content_parts) + "\n"
def build_system_prompt() -> str:
import platform
return SYSTEM_PROMPT_TEMPLATE.format(
date=datetime.now().strftime("%Y-%m-%d %A"),
cwd=str(Path.cwd()),
platform=platform.system(),
git_info=get_git_info(),
claude_md=get_claude_md(),
)

91
nano-claude-code/demo.py Normal file
View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""
Demo script for nano claude code.
Requires ANTHROPIC_API_KEY environment variable.
Run:
ANTHROPIC_API_KEY=sk-... python demo.py
"""
import os
import sys
# Add parent path for imports
sys.path.insert(0, os.path.dirname(__file__))
from config import load_config
from context import build_system_prompt
from agent import AgentState, run, TextChunk, ThinkingChunk, ToolStart, ToolEnd, TurnDone, PermissionRequest
def demo():
config = load_config()
if not config["api_key"]:
print("Error: Set ANTHROPIC_API_KEY environment variable")
sys.exit(1)
config["permission_mode"] = "accept-all" # Demo: auto-approve everything
config["verbose"] = True
state = AgentState()
system_prompt = build_system_prompt()
print("=" * 60)
print("DEMO 1: Simple question (no tools)")
print("=" * 60)
_run_demo(state, config, system_prompt,
"What is the time complexity of quicksort? Answer in 2 sentences.")
print("\n" + "=" * 60)
print("DEMO 2: File system exploration (uses Glob + Read tools)")
print("=" * 60)
state2 = AgentState()
_run_demo(state2, config, system_prompt,
"List all Python files in the current directory and show me the first 5 lines of nano_claude.py")
print("\n" + "=" * 60)
print("DEMO 3: Code writing (uses Write + Bash tools)")
print("=" * 60)
state3 = AgentState()
_run_demo(state3, config, system_prompt,
"Write a Python function to fibonacci(n) in /tmp/fib.py, then run it to test fib(10)")
print("\n" + "=" * 60)
print("DEMO 4: Multi-turn conversation")
print("=" * 60)
state4 = AgentState()
_run_demo(state4, config, system_prompt,
"What are the tools available to you?")
_run_demo(state4, config, system_prompt,
"Which of those tools would you use to find all TODO comments in a codebase?")
print("\n" + "=" * 60)
print("DEMO 5: Web search")
print("=" * 60)
state5 = AgentState()
_run_demo(state5, config, system_prompt,
"Search the web for 'Python 3.13 new features' and give me a brief summary")
def _run_demo(state: AgentState, config: dict, system_prompt: str, prompt: str):
print(f"\n[USER]: {prompt}\n")
print("[CLAUDE]: ", end="", flush=True)
for event in run(prompt, state, config, system_prompt):
if isinstance(event, TextChunk):
print(event.text, end="", flush=True)
elif isinstance(event, ThinkingChunk):
if config.get("verbose"):
print(f"\033[2m[thinking: {event.text[:100]}]\033[0m", end="", flush=True)
elif isinstance(event, ToolStart):
print(f"\n\033[36m ⚙ {event.name}({list(event.inputs.values())[0] if event.inputs else ''})\033[0m", flush=True)
elif isinstance(event, PermissionRequest):
event.granted = True # Auto-approve in demo
elif isinstance(event, ToolEnd):
result_preview = event.result[:100].replace('\n', '')
print(f"\033[32m ✓ → {result_preview}\033[0m", flush=True)
elif isinstance(event, TurnDone):
print(f"\n\033[2m [+{event.input_tokens} in / +{event.output_tokens} out]\033[0m", flush=True)
print()
if __name__ == "__main__":
demo()

View File

@@ -0,0 +1,417 @@
#!/usr/bin/env python3
"""
Generate animated GIF demo of nano claude code using PIL.
Simulates a realistic terminal session with tool calls.
"""
from PIL import Image, ImageDraw, ImageFont
import os, textwrap
# ── Catppuccin Mocha palette ─────────────────────────────────────────────
BG = (30, 30, 46) # base
SURFACE = (49, 50, 68) # surface0
TEXT = (205, 214, 244) # text
SUBTEXT = (108, 112, 134) # overlay0 (dim)
CYAN = (137, 220, 235) # sky
GREEN = (166, 227, 161) # green
YELLOW = (249, 226, 175) # yellow
RED = (243, 139, 168) # red
MAUVE = (203, 166, 247) # mauve (user prompt)
BLUE = (137, 180, 250) # blue
PEACH = (250, 179, 135) # peach
W, H = 960, 720
FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf"
FONT_BOLD = "/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf"
FONT_SIZE = 14
LINE_H = 20
PAD_X = 18
PAD_Y = 16
def make_font(size=FONT_SIZE, bold=False):
path = FONT_BOLD if bold else FONT_PATH
try:
return ImageFont.truetype(path, size)
except:
return ImageFont.load_default()
FONT = make_font()
FONT_B = make_font(bold=True)
FONT_SM = make_font(FONT_SIZE - 1)
# ── Segment: (text, color, bold?) ────────────────────────────────────────
Seg = tuple # (str, rgb_tuple, bool)
def seg(t, c=TEXT, b=False): return (t, c, b)
def segs(*args): return list(args)
def render_line(draw, y, segments, x_start=PAD_X):
x = x_start
for text, color, bold in segments:
font = FONT_B if bold else FONT
draw.text((x, y), text, font=font, fill=color)
x += font.getlength(text)
return y + LINE_H
def blank_frame():
img = Image.new("RGB", (W, H), BG)
return img
def draw_frame(lines_segments):
"""
lines_segments: list of either
- list[Seg] → rendered as a line
- None → blank line
Returns PIL Image.
"""
img = blank_frame()
d = ImageDraw.Draw(img)
y = PAD_Y
for item in lines_segments:
if item is None:
y += LINE_H
elif isinstance(item, list):
y = render_line(d, y, item)
else:
y = render_line(d, y, [item])
return img
# ── Pre-defined screen content blocks ───────────────────────────────────
BANNER = [
[seg("╭─ Nano Claude Code ──────────────────────────────────────────╮", SUBTEXT)],
[seg("", SUBTEXT), seg("Model: ", SUBTEXT), seg("claude-opus-4-6", CYAN, True)],
[seg("", SUBTEXT), seg("Permissions: ", SUBTEXT), seg("auto", YELLOW)],
[seg("│ Type /help for commands, Ctrl+C to cancel │", SUBTEXT)],
[seg("╰────────────────────────────────────────────────────────────╯", SUBTEXT)],
None,
]
def prompt_line(text="", cursor=False):
cur = "" if cursor else ""
return [
seg("[nano_claude_code] ", SUBTEXT),
seg(" ", CYAN, True),
seg(text + cur, TEXT),
]
def claude_header():
return [
seg("╭─ Claude ", SUBTEXT),
seg("", GREEN),
seg(" ─────────────────────────────────────────────", SUBTEXT),
]
def claude_sep():
return [seg("╰──────────────────────────────────────────────────────────", SUBTEXT)]
def tool_line(icon, name, arg, color=CYAN):
return [
seg(f" {icon} ", SUBTEXT),
seg(name, color),
seg("(", SUBTEXT),
seg(arg, TEXT),
seg(")", SUBTEXT),
]
def tool_ok(msg):
return [seg(f"", GREEN), seg(msg, SUBTEXT)]
def tool_err(msg):
return [seg(f"", RED), seg(msg, SUBTEXT)]
def text_line(t, indent=2):
return [seg(" " * indent + t, TEXT)]
def dim_line(t, indent=4):
return [seg(" " * indent + t, SUBTEXT)]
# ── Scene builder ─────────────────────────────────────────────────────────
def build_scenes():
"""Return list of (frame_content, duration_ms)."""
scenes = []
def add(lines, ms=120):
scenes.append((lines, ms))
# ── Scene 0: Empty terminal with banner ──────────────────────────────
add(BANNER + [prompt_line(cursor=True)], 800)
# ── Scene 1: User types query 1 ──────────────────────────────────────
msg1 = "List Python files in this project and show me their line counts"
for i in range(0, len(msg1) + 1, 3):
add(BANNER + [prompt_line(msg1[:i], cursor=(i < len(msg1)))], 60)
add(BANNER + [prompt_line(msg1, cursor=False)], 400)
# ── Scene 2: Claude header appears ──────────────────────────────────
pre = BANNER + [prompt_line(msg1)]
add(pre + [None, claude_header(), [seg("", SUBTEXT)]], 300)
# ── Scene 3: Tool call - Glob ────────────────────────────────────────
base = pre + [None, claude_header()]
add(base + [
tool_line("", "Glob", "**/*.py"),
], 500)
add(base + [
tool_line("", "Glob", "**/*.py"),
tool_ok("5 files matched"),
], 600)
# ── Scene 4: Tool call - Bash (wc -l) ────────────────────────────────
add(base + [
tool_line("", "Glob", "**/*.py"),
tool_ok("5 files matched"),
None,
tool_line("", "Bash", "wc -l *.py | sort -n"),
], 500)
add(base + [
tool_line("", "Glob", "**/*.py"),
tool_ok("5 files matched"),
None,
tool_line("", "Bash", "wc -l *.py | sort -n"),
tool_ok("→ 6 lines (120 chars)"),
], 700)
# ── Scene 5: Claude streams response ────────────────────────────────
response_lines = [
"Here are the Python files in this project with their line counts:",
"",
" 76 config.py — Configuration management and cost calculation",
" 100 context.py — System prompt builder, CLAUDE.md + git injection",
" 173 agent.py — Core agent loop with streaming API calls",
" 359 tools.py — 8 built-in tools (Read/Write/Edit/Bash/Glob/Grep/Web)",
" 553 nano_claude.py — REPL entry point, slash commands, rich rendering",
"────────────────────────────────────────────────────",
"1261 total",
"",
"The largest file is `nano_claude.py` containing the interactive REPL,",
"14 slash commands, permission handling, and markdown rendering.",
]
tool_section = [
tool_line("", "Glob", "**/*.py"),
tool_ok("5 files matched"),
None,
tool_line("", "Bash", "wc -l *.py | sort -n"),
tool_ok("→ 6 lines (120 chars)"),
None,
[seg("", SUBTEXT)],
]
streamed = []
for i, rline in enumerate(response_lines):
streamed.append(text_line(rline, 2))
content = base + tool_section + streamed
add(content, 80 if rline else 30)
add(base + tool_section + [text_line(l, 2) for l in response_lines] + [claude_sep()], 1200)
# ── Scene 6: New prompt appears ──────────────────────────────────────
full1 = (pre + [None, claude_header()] +
tool_section +
[text_line(l, 2) for l in response_lines] +
[claude_sep(), None])
add(full1 + [prompt_line(cursor=True)], 800)
# ── Scene 7: User types query 2 ──────────────────────────────────────
msg2 = "Write a hello_world.py that prints 'Hello from Nano Claude!'"
for i in range(0, len(msg2) + 1, 4):
add(full1 + [prompt_line(msg2[:i], cursor=(i < len(msg2)))], 55)
add(full1 + [prompt_line(msg2)], 400)
# ── Scene 8: Write tool call ─────────────────────────────────────────
base2 = full1 + [prompt_line(msg2), None, claude_header()]
add(base2 + [
tool_line("", "Write", "/tmp/hello_world.py", MAUVE),
], 600)
add(base2 + [
tool_line("", "Write", "/tmp/hello_world.py", MAUVE),
tool_ok("Wrote 3 lines to /tmp/hello_world.py"),
None,
tool_line("", "Bash", "python3 /tmp/hello_world.py"),
], 500)
add(base2 + [
tool_line("", "Write", "/tmp/hello_world.py", MAUVE),
tool_ok("Wrote 3 lines to /tmp/hello_world.py"),
None,
tool_line("", "Bash", "python3 /tmp/hello_world.py"),
tool_ok("→ Hello from Nano Claude!"),
], 800)
# ── Scene 9: Final response ──────────────────────────────────────────
resp2 = [
"Done! Created `/tmp/hello_world.py` and ran it successfully.",
"",
" print('Hello from Nano Claude!')",
"",
"Output: Hello from Nano Claude!",
]
tool2 = [
tool_line("", "Write", "/tmp/hello_world.py", MAUVE),
tool_ok("Wrote 3 lines to /tmp/hello_world.py"),
None,
tool_line("", "Bash", "python3 /tmp/hello_world.py"),
tool_ok("→ Hello from Nano Claude!"),
None,
[seg("", SUBTEXT)],
]
streamed2 = []
for rline in resp2:
streamed2.append(text_line(rline, 2))
add(base2 + tool2 + streamed2, 90)
add(base2 + tool2 + [text_line(l, 2) for l in resp2] + [claude_sep()], 1500)
# ── Scene 10: Slash command demo ─────────────────────────────────────
final_state = (full1 + [prompt_line(msg2), None, claude_header()] +
tool2 + [text_line(l, 2) for l in resp2] + [claude_sep(), None])
add(final_state + [prompt_line(cursor=True)], 600)
slash = "/cost"
for i in range(len(slash) + 1):
add(final_state + [prompt_line(slash[:i], cursor=(i < len(slash)))], 80)
add(final_state + [prompt_line(slash)], 400)
# cost output
cost_lines = [
[seg("Input tokens: ", CYAN), seg("1,842", TEXT, True)],
[seg("Output tokens: ", CYAN), seg("312", TEXT, True)],
[seg("Est. cost: ", CYAN), seg("$0.0318 USD", GREEN, True)],
]
add(final_state + [prompt_line(slash), None] + cost_lines + [None, prompt_line(cursor=True)], 2000)
return scenes
# ── Render ────────────────────────────────────────────────────────────────
def _build_explicit_palette():
"""
Build a 256-entry palette from our exact theme colors.
Returns flat list of 768 ints (R,G,B, R,G,B, ...) suitable for putpalette().
"""
# All distinct colors used in the renderer
theme = [
BG, SURFACE, TEXT, SUBTEXT,
CYAN, GREEN, YELLOW, RED, MAUVE, BLUE, PEACH,
(255, 255, 255), (0, 0, 0),
# Extra intermediate shades that PIL might snap to
(50, 55, 80), # surface variant
(90, 95, 120), # dim text variant
(160, 166, 200),
]
flat = []
for c in theme:
flat.extend(c)
# Pad to 256 entries with black
while len(flat) < 256 * 3:
flat.extend((0, 0, 0))
return flat
def render_gif(output_path="demo.gif"):
print("Building scenes...")
scenes = build_scenes()
print(f" {len(scenes)} scenes")
palette_data = _build_explicit_palette()
# Create a palette-mode reference image for quantize()
pal_ref = Image.new("P", (1, 1))
pal_ref.putpalette(palette_data)
print(" Rendering frames...")
rgb_frames = []
durations = []
for i, (lines, ms) in enumerate(scenes):
img = draw_frame(lines)
rgb_frames.append(img)
durations.append(ms)
if i % 20 == 0:
print(f" {i}/{len(scenes)}...")
# Quantize all frames to the same explicit palette (no dither → exact snap)
print(" Quantizing to global palette...")
p_frames = [f.quantize(palette=pal_ref, dither=0) for f in rgb_frames]
print(f"Saving GIF → {output_path} ({len(p_frames)} frames)...")
p_frames[0].save(
output_path,
save_all=True,
append_images=p_frames[1:],
duration=durations,
loop=0,
optimize=False,
)
size_kb = os.path.getsize(output_path) // 1024
print(f"Done! {size_kb} KB")
# ── Static screenshot ─────────────────────────────────────────────────────
def render_screenshot(output_path="screenshot.png"):
"""Single high-quality screenshot showing a complete session."""
lines = (
BANNER +
[prompt_line("List Python files and their line counts")] +
[None, claude_header()] +
[
tool_line("", "Glob", "**/*.py"),
tool_ok("5 files matched"),
None,
tool_line("", "Bash", "wc -l *.py | sort -n"),
tool_ok("→ 6 lines (120 chars)"),
None,
[seg("", SUBTEXT)],
text_line("Here are the Python files with their line counts:", 2),
None,
text_line(" 76 config.py — Configuration management", 2),
text_line(" 100 context.py — System prompt + git injection", 2),
text_line(" 173 agent.py — Core agent loop", 2),
text_line(" 359 tools.py — 8 built-in tools", 2),
text_line(" 553 nano_claude.py — REPL + slash commands", 2),
text_line("────────────────────────────────", 2),
text_line("1261 total", 2),
None,
text_line("The main entry point `nano_claude.py` contains the REPL,", 2),
text_line("14 slash commands, permission handling, and rich rendering.", 2),
claude_sep(),
None,
prompt_line("/cost"),
None,
[seg("Input tokens: ", CYAN), seg("1,842", TEXT, True)],
[seg("Output tokens: ", CYAN), seg("312", TEXT, True)],
[seg("Est. cost: ", CYAN), seg("$0.0318 USD", GREEN, True)],
None,
prompt_line(cursor=True),
]
)
img = draw_frame(lines)
# Add subtle rounded border effect
d = ImageDraw.Draw(img)
d.rectangle([0, 0, W-1, H-1], outline=SURFACE, width=2)
img.save(output_path, format="PNG", optimize=True)
size_kb = os.path.getsize(output_path) // 1024
print(f"Screenshot saved: {output_path} ({size_kb} KB)")
if __name__ == "__main__":
import sys
out_dir = os.path.dirname(os.path.abspath(__file__))
gif_path = os.path.join(out_dir, "demo.gif")
png_path = os.path.join(out_dir, "screenshot.png")
render_screenshot(png_path)
render_gif(gif_path)
print("\nFiles created:")
print(f" {png_path}")
print(f" {gif_path}")

View File

@@ -0,0 +1,576 @@
#!/usr/bin/env python3
"""
Nano Claude Code — Minimal Python implementation of Claude Code.
Usage:
python nano_claude.py [options] [prompt]
Options:
-p, --print Non-interactive: run prompt and exit (also --print-output)
-m, --model MODEL Override model
--accept-all Never ask permission (dangerous)
--verbose Show thinking + token counts
--version Print version and exit
Slash commands in REPL:
/help Show this help
/clear Clear conversation
/model [m] Show or set model
/config Show config / set key=value
/save [f] Save session to file
/load [f] Load session from file
/history Print conversation history
/context Show context window usage
/cost Show API cost this session
/verbose Toggle verbose mode
/thinking Toggle extended thinking
/permissions [mode] Set permission mode
/cwd [path] Show or change working directory
/exit /quit Exit
"""
from __future__ import annotations
import os
import sys
import json
import readline
import atexit
import argparse
import textwrap
from pathlib import Path
from datetime import datetime
from typing import Optional
# ── Optional rich for markdown rendering ──────────────────────────────────
try:
from rich.console import Console
from rich.markdown import Markdown
from rich.syntax import Syntax
from rich.panel import Panel
from rich import print as rprint
_RICH = True
console = Console()
except ImportError:
_RICH = False
console = None
VERSION = "1.0.0"
# ── ANSI helpers (used even with rich for non-markdown output) ─────────────
C = {
"cyan": "\033[36m",
"green": "\033[32m",
"yellow": "\033[33m",
"red": "\033[31m",
"blue": "\033[34m",
"magenta": "\033[35m",
"bold": "\033[1m",
"dim": "\033[2m",
"reset": "\033[0m",
}
def clr(text: str, *keys: str) -> str:
return "".join(C[k] for k in keys) + str(text) + C["reset"]
def info(msg: str): print(clr(msg, "cyan"))
def ok(msg: str): print(clr(msg, "green"))
def warn(msg: str): print(clr(f"Warning: {msg}", "yellow"))
def err(msg: str): print(clr(f"Error: {msg}", "red"), file=sys.stderr)
# ── Conversation rendering ─────────────────────────────────────────────────
_accumulated_text: list[str] = [] # buffer text during streaming
def stream_text(chunk: str):
"""Called for each streamed text chunk."""
print(chunk, end="", flush=True)
_accumulated_text.append(chunk)
def stream_thinking(chunk: str, verbose: bool):
if verbose:
print(clr(chunk, "dim"), end="", flush=True)
def flush_response():
"""After streaming, optionally re-render as markdown."""
full = "".join(_accumulated_text)
_accumulated_text.clear()
if _RICH and full.strip():
# Re-print with markdown rendering
print("\r", end="") # go to line start to overwrite last newline
# only re-render if there's actual markdown (contains # * ` _ etc.)
if any(c in full for c in ("#", "*", "`", "_", "[")):
print() # newline after streaming
console.print(Markdown(full))
return
print() # ensure newline after stream
def print_tool_start(name: str, inputs: dict, verbose: bool):
"""Show tool invocation."""
desc = _tool_desc(name, inputs)
print(clr(f"\n{desc}", "dim", "cyan"), flush=True)
if verbose:
print(clr(f" inputs: {json.dumps(inputs, ensure_ascii=False)[:200]}", "dim"))
def print_tool_end(name: str, result: str, verbose: bool):
lines = result.count("\n") + 1
size = len(result)
summary = f"{lines} lines ({size} chars)"
if not result.startswith("Error") and not result.startswith("Denied"):
print(clr(f"{summary}", "dim", "green"), flush=True)
else:
print(clr(f"{result[:120]}", "dim", "red"), flush=True)
if verbose and not result.startswith("Denied"):
preview = result[:500] + ("" if len(result) > 500 else "")
print(clr(f" {preview.replace(chr(10), chr(10)+' ')}", "dim"))
def _tool_desc(name: str, inputs: dict) -> str:
if name == "Read": return f"Read({inputs.get('file_path','')})"
if name == "Write": return f"Write({inputs.get('file_path','')})"
if name == "Edit": return f"Edit({inputs.get('file_path','')})"
if name == "Bash": return f"Bash({inputs.get('command','')[:80]})"
if name == "Glob": return f"Glob({inputs.get('pattern','')})"
if name == "Grep": return f"Grep({inputs.get('pattern','')})"
if name == "WebFetch": return f"WebFetch({inputs.get('url','')[:60]})"
if name == "WebSearch": return f"WebSearch({inputs.get('query','')})"
return f"{name}({list(inputs.values())[:1]})"
# ── Permission prompt ──────────────────────────────────────────────────────
def ask_permission_interactive(desc: str, config: dict) -> bool:
try:
print()
ans = input(clr(f" Allow: {desc} [y/N/a(ccept-all)] ", "yellow")).strip().lower()
if ans == "a":
config["permission_mode"] = "accept-all"
ok(" Permission mode set to accept-all for this session.")
return True
return ans in ("y", "yes")
except (KeyboardInterrupt, EOFError):
print()
return False
# ── Slash commands ─────────────────────────────────────────────────────────
def cmd_help(_args: str, _state, _config) -> bool:
print(__doc__)
return True
def cmd_clear(_args: str, state, _config) -> bool:
state.messages.clear()
state.turn_count = 0
ok("Conversation cleared.")
return True
def cmd_model(args: str, _state, config) -> bool:
from providers import PROVIDERS, detect_provider
if not args:
model = config["model"]
pname = detect_provider(model)
info(f"Current model: {model} (provider: {pname})")
info("\nAvailable models by provider:")
for pn, pdata in PROVIDERS.items():
ms = pdata.get("models", [])
if ms:
info(f" {pn:12s} " + ", ".join(ms[:4]) + ("..." if len(ms) > 4 else ""))
info("\nFormat: 'provider/model' or just model name (auto-detected)")
info(" e.g. /model gpt-4o")
info(" e.g. /model ollama/qwen2.5-coder")
info(" e.g. /model kimi:moonshot-v1-32k")
else:
# Accept both "ollama/model" and "ollama:model" syntax
m = args.strip().replace(":", "/", 1)
config["model"] = m
pname = detect_provider(m)
ok(f"Model set to {m} (provider: {pname})")
from config import save_config
save_config(config)
return True
def cmd_config(args: str, _state, config) -> bool:
from config import save_config
if not args:
display = {k: v for k, v in config.items() if k != "api_key"}
print(json.dumps(display, indent=2))
elif "=" in args:
key, _, val = args.partition("=")
key, val = key.strip(), val.strip()
# Type coercion
if val.lower() in ("true", "false"):
val = val.lower() == "true"
elif val.isdigit():
val = int(val)
config[key] = val
save_config(config)
ok(f"Set {key} = {val}")
else:
k = args.strip()
v = config.get(k, "(not set)")
info(f"{k} = {v}")
return True
def cmd_save(args: str, state, _config) -> bool:
from config import SESSIONS_DIR
fname = args.strip() or f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
path = Path(fname) if "/" in fname else SESSIONS_DIR / fname
data = {
"messages": [
m if not isinstance(m.get("content"), list) else
{**m, "content": [
b if isinstance(b, dict) else b.model_dump()
for b in m["content"]
]}
for m in state.messages
],
"turn_count": state.turn_count,
"total_input_tokens": state.total_input_tokens,
"total_output_tokens": state.total_output_tokens,
}
path.write_text(json.dumps(data, indent=2, default=str))
ok(f"Session saved to {path}")
return True
def cmd_load(args: str, state, _config) -> bool:
from config import SESSIONS_DIR
if not args.strip():
# List available sessions
sessions = sorted(SESSIONS_DIR.glob("*.json"))
if not sessions:
info("No saved sessions found.")
else:
info("Saved sessions:")
for s in sessions:
info(f" {s.name}")
return True
fname = args.strip()
path = Path(fname) if "/" in fname else SESSIONS_DIR / fname
if not path.exists():
err(f"File not found: {path}")
return True
data = json.loads(path.read_text())
state.messages = data.get("messages", [])
state.turn_count = data.get("turn_count", 0)
state.total_input_tokens = data.get("total_input_tokens", 0)
state.total_output_tokens = data.get("total_output_tokens", 0)
ok(f"Session loaded from {path} ({len(state.messages)} messages)")
return True
def cmd_history(_args: str, state, _config) -> bool:
if not state.messages:
info("(empty conversation)")
return True
for i, m in enumerate(state.messages):
role = clr(m["role"].upper(), "bold",
"cyan" if m["role"] == "user" else "green")
content = m["content"]
if isinstance(content, str):
print(f"[{i}] {role}: {content[:200]}")
elif isinstance(content, list):
for block in content:
if isinstance(block, dict):
btype = block.get("type", "")
else:
btype = getattr(block, "type", "")
if btype == "text":
text = block.get("text", "") if isinstance(block, dict) else block.text
print(f"[{i}] {role}: {text[:200]}")
elif btype == "tool_use":
name = block.get("name", "") if isinstance(block, dict) else block.name
print(f"[{i}] {role}: [tool_use: {name}]")
elif btype == "tool_result":
cval = block.get("content", "") if isinstance(block, dict) else block.content
print(f"[{i}] {role}: [tool_result: {str(cval)[:100]}]")
return True
def cmd_context(_args: str, state, config) -> bool:
import anthropic
# Rough token estimate: 4 chars ≈ 1 token
msg_chars = sum(
len(str(m.get("content", ""))) for m in state.messages
)
est_tokens = msg_chars // 4
info(f"Messages: {len(state.messages)}")
info(f"Estimated tokens: ~{est_tokens:,}")
info(f"Model: {config['model']}")
info(f"Max tokens: {config['max_tokens']:,}")
return True
def cmd_cost(_args: str, state, config) -> bool:
from config import calc_cost
cost = calc_cost(config["model"],
state.total_input_tokens,
state.total_output_tokens)
info(f"Input tokens: {state.total_input_tokens:,}")
info(f"Output tokens: {state.total_output_tokens:,}")
info(f"Est. cost: ${cost:.4f} USD")
return True
def cmd_verbose(_args: str, _state, config) -> bool:
config["verbose"] = not config.get("verbose", False)
state_str = "ON" if config["verbose"] else "OFF"
ok(f"Verbose mode: {state_str}")
return True
def cmd_thinking(_args: str, _state, config) -> bool:
config["thinking"] = not config.get("thinking", False)
state_str = "ON" if config["thinking"] else "OFF"
ok(f"Extended thinking: {state_str}")
return True
def cmd_permissions(args: str, _state, config) -> bool:
from config import save_config
modes = ["auto", "accept-all", "manual"]
if not args.strip():
info(f"Permission mode: {config.get('permission_mode','auto')}")
info(f"Available modes: {', '.join(modes)}")
else:
m = args.strip()
if m not in modes:
err(f"Unknown mode: {m}. Choose: {', '.join(modes)}")
else:
config["permission_mode"] = m
save_config(config)
ok(f"Permission mode set to: {m}")
return True
def cmd_cwd(args: str, _state, _config) -> bool:
if not args.strip():
info(f"Working directory: {os.getcwd()}")
else:
p = args.strip()
try:
os.chdir(p)
ok(f"Changed directory to: {os.getcwd()}")
except Exception as e:
err(str(e))
return True
def cmd_exit(_args: str, _state, _config) -> bool:
ok("Goodbye!")
sys.exit(0)
COMMANDS = {
"help": cmd_help,
"clear": cmd_clear,
"model": cmd_model,
"config": cmd_config,
"save": cmd_save,
"load": cmd_load,
"history": cmd_history,
"context": cmd_context,
"cost": cmd_cost,
"verbose": cmd_verbose,
"thinking": cmd_thinking,
"permissions": cmd_permissions,
"cwd": cmd_cwd,
"exit": cmd_exit,
"quit": cmd_exit,
}
def handle_slash(line: str, state, config) -> bool:
"""Handle /command [args]. Returns True if handled."""
if not line.startswith("/"):
return False
parts = line[1:].split(None, 1)
if not parts:
return False
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
handler = COMMANDS.get(cmd)
if handler:
handler(args, state, config)
return True
err(f"Unknown command: /{cmd} (type /help for commands)")
return True
# ── Input history setup ────────────────────────────────────────────────────
def setup_readline(history_file: Path):
try:
readline.read_history_file(str(history_file))
except FileNotFoundError:
pass
readline.set_history_length(1000)
atexit.register(readline.write_history_file, str(history_file))
# Tab-complete slash commands
commands = [f"/{c}" for c in COMMANDS]
def completer(text: str, state: int):
matches = [c for c in commands if c.startswith(text)]
return matches[state] if state < len(matches) else None
readline.set_completer(completer)
readline.parse_and_bind("tab: complete")
# ── Main REPL ──────────────────────────────────────────────────────────────
def repl(config: dict, initial_prompt: str = None):
from config import HISTORY_FILE
from context import build_system_prompt
from agent import AgentState, run, TextChunk, ThinkingChunk, ToolStart, ToolEnd, TurnDone, PermissionRequest
setup_readline(HISTORY_FILE)
state = AgentState()
verbose = config.get("verbose", False)
# Banner
if not initial_prompt:
from providers import detect_provider
model = config["model"]
pname = detect_provider(model)
model_clr = clr(model, "cyan", "bold")
prov_clr = clr(f"({pname})", "dim")
pmode = clr(config.get("permission_mode", "auto"), "yellow")
print(clr("╭─ Nano Claude Code ──────────────────────────────╮", "dim"))
print(clr("│ Model: ", "dim") + model_clr + " " + prov_clr)
print(clr("│ Permissions: ", "dim") + pmode)
print(clr("│ /model to switch provider · /help for commands │", "dim"))
print(clr("╰──────────────────────────────────────────────────╯", "dim"))
print()
def run_query(user_input: str):
nonlocal verbose
verbose = config.get("verbose", False)
# Rebuild system prompt each turn (picks up cwd changes, etc.)
system_prompt = build_system_prompt()
print(clr("\n╭─ Claude ", "dim") + clr("", "green") + clr(" ─────────────────────────", "dim"))
print(clr("", "dim"), end="", flush=True)
thinking_started = False
for event in run(user_input, state, config, system_prompt):
if isinstance(event, TextChunk):
stream_text(event.text)
elif isinstance(event, ThinkingChunk):
if verbose:
if not thinking_started:
print(clr("\n [thinking]", "dim"))
thinking_started = True
stream_thinking(event.text, verbose)
elif isinstance(event, ToolStart):
flush_response()
print_tool_start(event.name, event.inputs, verbose)
elif isinstance(event, PermissionRequest):
event.granted = ask_permission_interactive(event.description, config)
elif isinstance(event, ToolEnd):
print_tool_end(event.name, event.result, verbose)
# Print prefix for next text
print(clr("", "dim"), end="", flush=True)
elif isinstance(event, TurnDone):
if verbose:
print(clr(
f"\n [tokens: +{event.input_tokens} in / "
f"+{event.output_tokens} out]", "dim"
))
flush_response()
print(clr("╰──────────────────────────────────────────────", "dim"))
print()
# ── Main loop ──
if initial_prompt:
try:
run_query(initial_prompt)
except KeyboardInterrupt:
print()
return
while True:
try:
cwd_short = Path.cwd().name
prompt = clr(f"\n[{cwd_short}] ", "dim") + clr(" ", "cyan", "bold")
user_input = input(prompt).strip()
except (EOFError, KeyboardInterrupt):
print()
ok("Goodbye!")
sys.exit(0)
if not user_input:
continue
if handle_slash(user_input, state, config):
continue
try:
run_query(user_input)
except KeyboardInterrupt:
print(clr("\n (interrupted)", "yellow"))
# Keep conversation history up to the interruption
# ── Entry point ────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
prog="nano_claude",
description="Nano Claude Code — minimal Python Claude Code implementation",
add_help=False,
)
parser.add_argument("prompt", nargs="*", help="Initial prompt (non-interactive)")
parser.add_argument("-p", "--print", "--print-output",
dest="print_mode", action="store_true",
help="Non-interactive mode: run prompt and exit")
parser.add_argument("-m", "--model", help="Override model")
parser.add_argument("--accept-all", action="store_true",
help="Never ask permission (accept all operations)")
parser.add_argument("--verbose", action="store_true",
help="Show thinking + token counts")
parser.add_argument("--thinking", action="store_true",
help="Enable extended thinking")
parser.add_argument("--version", action="store_true", help="Print version")
parser.add_argument("-h", "--help", action="store_true", help="Show help")
args = parser.parse_args()
if args.version:
print(f"nano claude code v{VERSION}")
sys.exit(0)
if args.help:
print(__doc__)
sys.exit(0)
from config import load_config, save_config, has_api_key
from providers import detect_provider, PROVIDERS
config = load_config()
# Apply CLI overrides first (so key check uses the right provider)
if args.model:
config["model"] = args.model.replace(":", "/", 1)
if args.accept_all:
config["permission_mode"] = "accept-all"
if args.verbose:
config["verbose"] = True
if args.thinking:
config["thinking"] = True
# Check API key for active provider (warn only, don't block local providers)
if not has_api_key(config):
pname = detect_provider(config["model"])
prov = PROVIDERS.get(pname, {})
env = prov.get("api_key_env", "")
if env: # local providers like ollama have no env key requirement
warn(f"No API key found for provider '{pname}'. "
f"Set {env} or run: /config {pname}_api_key=YOUR_KEY")
initial = " ".join(args.prompt) if args.prompt else None
if args.print_mode and not initial:
err("--print requires a prompt argument")
sys.exit(1)
repl(config, initial_prompt=initial)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,487 @@
"""
Multi-provider support for nano claude.
Supported providers:
anthropic — Claude (claude-opus-4-6, claude-sonnet-4-6, ...)
openai — GPT (gpt-4o, o3-mini, ...)
gemini — Google Gemini (gemini-2.0-flash, gemini-1.5-pro, ...)
kimi — Moonshot AI (moonshot-v1-8k/32k/128k)
qwen — Alibaba DashScope (qwen-max, qwen-plus, ...)
zhipu — Zhipu GLM (glm-4, glm-4-plus, ...)
deepseek — DeepSeek (deepseek-chat, deepseek-reasoner, ...)
ollama — Local Ollama (llama3.3, qwen2.5-coder, ...)
lmstudio — Local LM Studio (any loaded model)
custom — Any OpenAI-compatible endpoint
Model string formats:
"claude-opus-4-6" auto-detected → anthropic
"gpt-4o" auto-detected → openai
"ollama/qwen2.5-coder" explicit provider prefix
"custom/my-model" uses CUSTOM_BASE_URL from config
"""
from __future__ import annotations
import json
from typing import Generator
# ── Provider registry ──────────────────────────────────────────────────────
PROVIDERS: dict[str, dict] = {
"anthropic": {
"type": "anthropic",
"api_key_env": "ANTHROPIC_API_KEY",
"models": [
"claude-opus-4-6", "claude-sonnet-4-6", "claude-haiku-4-5-20251001",
"claude-opus-4-5", "claude-sonnet-4-5",
"claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022",
],
},
"openai": {
"type": "openai",
"api_key_env": "OPENAI_API_KEY",
"base_url": "https://api.openai.com/v1",
"models": [
"gpt-4o", "gpt-4o-mini", "gpt-4-turbo",
"o3-mini", "o1", "o1-mini",
],
},
"gemini": {
"type": "openai",
"api_key_env": "GEMINI_API_KEY",
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai/",
"models": [
"gemini-2.5-pro-preview-03-25",
"gemini-2.0-flash", "gemini-2.0-flash-lite",
"gemini-1.5-pro", "gemini-1.5-flash",
],
},
"kimi": {
"type": "openai",
"api_key_env": "MOONSHOT_API_KEY",
"base_url": "https://api.moonshot.cn/v1",
"models": [
"moonshot-v1-8k", "moonshot-v1-32k", "moonshot-v1-128k",
"kimi-latest",
],
},
"qwen": {
"type": "openai",
"api_key_env": "DASHSCOPE_API_KEY",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"models": [
"qwen-max", "qwen-plus", "qwen-turbo", "qwen-long",
"qwen2.5-72b-instruct", "qwen2.5-coder-32b-instruct",
"qwq-32b",
],
},
"zhipu": {
"type": "openai",
"api_key_env": "ZHIPU_API_KEY",
"base_url": "https://open.bigmodel.cn/api/paas/v4/",
"models": [
"glm-4-plus", "glm-4", "glm-4-flash", "glm-4-air",
"glm-z1-flash",
],
},
"deepseek": {
"type": "openai",
"api_key_env": "DEEPSEEK_API_KEY",
"base_url": "https://api.deepseek.com/v1",
"models": [
"deepseek-chat", "deepseek-coder", "deepseek-reasoner",
],
},
"ollama": {
"type": "openai",
"api_key_env": None,
"base_url": "http://localhost:11434/v1",
"api_key": "ollama",
"models": [
"llama3.3", "llama3.2", "phi4", "mistral", "mixtral",
"qwen2.5-coder", "deepseek-r1", "gemma3",
],
},
"lmstudio": {
"type": "openai",
"api_key_env": None,
"base_url": "http://localhost:1234/v1",
"api_key": "lm-studio",
"models": [], # dynamic, depends on loaded model
},
"custom": {
"type": "openai",
"api_key_env": "CUSTOM_API_KEY",
"base_url": None, # read from config["custom_base_url"]
"models": [],
},
}
# Cost per million tokens (approximate, fallback to 0 for unknown)
COSTS = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4-5-20251001": (0.8, 4.0),
"gpt-4o": (2.5, 10.0),
"gpt-4o-mini": (0.15, 0.6),
"o3-mini": (1.1, 4.4),
"gemini-2.0-flash": (0.075, 0.3),
"gemini-1.5-pro": (1.25, 5.0),
"gemini-2.5-pro-preview-03-25": (1.25, 10.0),
"moonshot-v1-8k": (1.0, 3.0),
"moonshot-v1-32k": (2.4, 7.0),
"moonshot-v1-128k": (8.0, 24.0),
"qwen-max": (2.4, 9.6),
"qwen-plus": (0.4, 1.2),
"deepseek-chat": (0.27, 1.1),
"deepseek-reasoner": (0.55, 2.19),
"glm-4-plus": (0.7, 0.7),
}
# Auto-detection: prefix → provider name
_PREFIXES = [
("claude-", "anthropic"),
("gpt-", "openai"),
("o1", "openai"),
("o3", "openai"),
("gemini-", "gemini"),
("moonshot-", "kimi"),
("kimi-", "kimi"),
("qwen", "qwen"), # qwen-max, qwen2.5-...
("qwq-", "qwen"),
("glm-", "zhipu"),
("deepseek-", "deepseek"),
("llama", "ollama"),
("mistral", "ollama"),
("phi", "ollama"),
("gemma", "ollama"),
]
def detect_provider(model: str) -> str:
"""Return provider name for a model string.
Supports 'provider/model' explicit format, or auto-detect by prefix."""
if "/" in model:
return model.split("/", 1)[0]
for prefix, pname in _PREFIXES:
if model.lower().startswith(prefix):
return pname
return "openai" # fallback
def bare_model(model: str) -> str:
"""Strip 'provider/' prefix if present."""
return model.split("/", 1)[1] if "/" in model else model
def get_api_key(provider_name: str, config: dict) -> str:
prov = PROVIDERS.get(provider_name, {})
# 1. Check config dict (e.g. config["kimi_api_key"])
cfg_key = config.get(f"{provider_name}_api_key", "")
if cfg_key:
return cfg_key
# 2. Check env var
env_var = prov.get("api_key_env")
if env_var:
import os
return os.environ.get(env_var, "")
# 3. Hardcoded (for local providers)
return prov.get("api_key", "")
def calc_cost(model: str, in_tok: int, out_tok: int) -> float:
ic, oc = COSTS.get(bare_model(model), (0.0, 0.0))
return (in_tok * ic + out_tok * oc) / 1_000_000
# ── Tool schema conversion ─────────────────────────────────────────────────
def tools_to_openai(tool_schemas: list) -> list:
"""Convert Anthropic-style tool schemas to OpenAI function-calling format."""
return [
{
"type": "function",
"function": {
"name": t["name"],
"description": t["description"],
"parameters": t["input_schema"],
},
}
for t in tool_schemas
]
# ── Message format conversion ──────────────────────────────────────────────
#
# Internal "neutral" message format:
# {"role": "user", "content": "text"}
# {"role": "assistant", "content": "text", "tool_calls": [
# {"id": "...", "name": "...", "input": {...}}
# ]}
# {"role": "tool", "tool_call_id": "...", "name": "...", "content": "..."}
def messages_to_anthropic(messages: list) -> list:
"""Convert neutral messages → Anthropic API format."""
result = []
i = 0
while i < len(messages):
m = messages[i]
role = m["role"]
if role == "user":
result.append({"role": "user", "content": m["content"]})
i += 1
elif role == "assistant":
blocks = []
text = m.get("content", "")
if text:
blocks.append({"type": "text", "text": text})
for tc in m.get("tool_calls", []):
blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc["input"],
})
result.append({"role": "assistant", "content": blocks})
i += 1
elif role == "tool":
# Collect consecutive tool results into one user message
tool_blocks = []
while i < len(messages) and messages[i]["role"] == "tool":
t = messages[i]
tool_blocks.append({
"type": "tool_result",
"tool_use_id": t["tool_call_id"],
"content": t["content"],
})
i += 1
result.append({"role": "user", "content": tool_blocks})
else:
i += 1
return result
def messages_to_openai(messages: list) -> list:
"""Convert neutral messages → OpenAI API format."""
result = []
for m in messages:
role = m["role"]
if role == "user":
result.append({"role": "user", "content": m["content"]})
elif role == "assistant":
msg: dict = {"role": "assistant", "content": m.get("content") or None}
tcs = m.get("tool_calls", [])
if tcs:
msg["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["input"], ensure_ascii=False),
},
}
for tc in tcs
]
result.append(msg)
elif role == "tool":
result.append({
"role": "tool",
"tool_call_id": m["tool_call_id"],
"content": m["content"],
})
return result
# ── Streaming adapters ─────────────────────────────────────────────────────
class TextChunk:
def __init__(self, text): self.text = text
class ThinkingChunk:
def __init__(self, text): self.text = text
class AssistantTurn:
"""Completed assistant turn with text + tool_calls."""
def __init__(self, text, tool_calls, in_tokens, out_tokens):
self.text = text
self.tool_calls = tool_calls # list of {id, name, input}
self.in_tokens = in_tokens
self.out_tokens = out_tokens
def stream_anthropic(
api_key: str,
model: str,
system: str,
messages: list,
tool_schemas: list,
config: dict,
) -> Generator:
"""Stream from Anthropic API. Yields TextChunk/ThinkingChunk, then AssistantTurn."""
import anthropic as _ant
client = _ant.Anthropic(api_key=api_key)
kwargs = {
"model": model,
"max_tokens": config.get("max_tokens", 8192),
"system": system,
"messages": messages_to_anthropic(messages),
"tools": tool_schemas,
}
if config.get("thinking"):
kwargs["thinking"] = {
"type": "enabled",
"budget_tokens": config.get("thinking_budget", 10000),
}
tool_calls = []
text = ""
with client.messages.stream(**kwargs) as stream:
for event in stream:
etype = getattr(event, "type", None)
if etype == "content_block_delta":
delta = event.delta
dtype = getattr(delta, "type", None)
if dtype == "text_delta":
text += delta.text
yield TextChunk(delta.text)
elif dtype == "thinking_delta":
yield ThinkingChunk(delta.thinking)
final = stream.get_final_message()
for block in final.content:
if block.type == "tool_use":
tool_calls.append({
"id": block.id,
"name": block.name,
"input": block.input,
})
yield AssistantTurn(
text, tool_calls,
final.usage.input_tokens,
final.usage.output_tokens,
)
def stream_openai_compat(
api_key: str,
base_url: str,
model: str,
system: str,
messages: list,
tool_schemas: list,
config: dict,
) -> Generator:
"""Stream from any OpenAI-compatible API. Yields TextChunk, then AssistantTurn."""
from openai import OpenAI
client = OpenAI(api_key=api_key or "dummy", base_url=base_url)
oai_messages = [{"role": "system", "content": system}] + messages_to_openai(messages)
kwargs: dict = {
"model": model,
"messages": oai_messages,
"stream": True,
}
if tool_schemas and not config.get("no_tools"):
kwargs["tools"] = tools_to_openai(tool_schemas)
# "auto" requires vLLM --enable-auto-tool-choice; omit if server doesn't support it
if not config.get("disable_tool_choice"):
kwargs["tool_choice"] = "auto"
if config.get("max_tokens"):
kwargs["max_tokens"] = config["max_tokens"]
text = ""
tool_buf: dict = {} # index → {id, name, args_str}
in_tok = out_tok = 0
stream = client.chat.completions.create(**kwargs)
for chunk in stream:
if not chunk.choices:
# usage-only chunk (some providers send this last)
if hasattr(chunk, "usage") and chunk.usage:
in_tok = chunk.usage.prompt_tokens
out_tok = chunk.usage.completion_tokens
continue
choice = chunk.choices[0]
delta = choice.delta
if delta.content:
text += delta.content
yield TextChunk(delta.content)
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in tool_buf:
tool_buf[idx] = {"id": "", "name": "", "args": ""}
if tc.id:
tool_buf[idx]["id"] = tc.id
if tc.function:
if tc.function.name:
tool_buf[idx]["name"] += tc.function.name
if tc.function.arguments:
tool_buf[idx]["args"] += tc.function.arguments
# Some providers include usage in the last chunk
if hasattr(chunk, "usage") and chunk.usage:
in_tok = chunk.usage.prompt_tokens or in_tok
out_tok = chunk.usage.completion_tokens or out_tok
tool_calls = []
for idx in sorted(tool_buf):
v = tool_buf[idx]
try:
inp = json.loads(v["args"]) if v["args"] else {}
except json.JSONDecodeError:
inp = {"_raw": v["args"]}
tool_calls.append({"id": v["id"] or f"call_{idx}", "name": v["name"], "input": inp})
yield AssistantTurn(text, tool_calls, in_tok, out_tok)
def stream(
model: str,
system: str,
messages: list,
tool_schemas: list,
config: dict,
) -> Generator:
"""
Unified streaming entry point.
Auto-detects provider from model string.
Yields: TextChunk | ThinkingChunk | AssistantTurn
"""
provider_name = detect_provider(model)
model_name = bare_model(model)
prov = PROVIDERS.get(provider_name, PROVIDERS["openai"])
api_key = get_api_key(provider_name, config)
if prov["type"] == "anthropic":
yield from stream_anthropic(api_key, model_name, system, messages, tool_schemas, config)
else:
import os as _os
if provider_name == "custom":
base_url = (config.get("custom_base_url")
or _os.environ.get("CUSTOM_BASE_URL", ""))
if not base_url:
raise ValueError(
"custom provider requires a base_url. "
"Set CUSTOM_BASE_URL env var or run: /config custom_base_url=http://..."
)
else:
base_url = prov.get("base_url", "https://api.openai.com/v1")
yield from stream_openai_compat(
api_key, base_url, model_name, system, messages, tool_schemas, config
)

View File

@@ -0,0 +1,4 @@
anthropic>=0.40.0
openai>=1.30.0
httpx>=0.27.0
rich>=13.0.0

359
nano-claude-code/tools.py Normal file
View File

@@ -0,0 +1,359 @@
"""Tool definitions and implementations for nano claude."""
import os
import re
import glob as _glob
import subprocess
from pathlib import Path
from typing import Callable, Optional
# ── Tool JSON schemas (sent to Claude API) ─────────────────────────────────
TOOL_SCHEMAS = [
{
"name": "Read",
"description": (
"Read a file's contents. Returns content with line numbers "
"(format: 'N\\tline'). Use limit/offset to read large files in chunks."
),
"input_schema": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Absolute file path"},
"limit": {"type": "integer", "description": "Max lines to read"},
"offset": {"type": "integer", "description": "Start line (0-indexed)"},
},
"required": ["file_path"],
},
},
{
"name": "Write",
"description": "Write content to a file, creating parent directories as needed.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["file_path", "content"],
},
},
{
"name": "Edit",
"description": (
"Replace exact text in a file. old_string must match exactly (including whitespace). "
"If old_string appears multiple times, use replace_all=true or add more context."
),
"input_schema": {
"type": "object",
"properties": {
"file_path": {"type": "string"},
"old_string": {"type": "string", "description": "Exact text to replace"},
"new_string": {"type": "string", "description": "Replacement text"},
"replace_all": {"type": "boolean", "description": "Replace all occurrences"},
},
"required": ["file_path", "old_string", "new_string"],
},
},
{
"name": "Bash",
"description": "Execute a shell command. Returns stdout+stderr. Stateless (no cd persistence).",
"input_schema": {
"type": "object",
"properties": {
"command": {"type": "string"},
"timeout": {"type": "integer", "description": "Seconds before timeout (default 30)"},
},
"required": ["command"],
},
},
{
"name": "Glob",
"description": "Find files matching a glob pattern. Returns sorted list of matching paths.",
"input_schema": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Glob pattern e.g. **/*.py"},
"path": {"type": "string", "description": "Base directory (default: cwd)"},
},
"required": ["pattern"],
},
},
{
"name": "Grep",
"description": "Search file contents with regex using ripgrep (falls back to grep).",
"input_schema": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Regex pattern"},
"path": {"type": "string", "description": "File or directory to search"},
"glob": {"type": "string", "description": "File filter e.g. *.py"},
"output_mode": {
"type": "string",
"enum": ["content", "files_with_matches", "count"],
"description": "content=matching lines, files_with_matches=file paths, count=match counts",
},
"case_insensitive": {"type": "boolean"},
"context": {"type": "integer", "description": "Lines of context around matches"},
},
"required": ["pattern"],
},
},
{
"name": "WebFetch",
"description": "Fetch a URL and return its text content (HTML stripped).",
"input_schema": {
"type": "object",
"properties": {
"url": {"type": "string"},
"prompt": {"type": "string", "description": "Hint for what to extract"},
},
"required": ["url"],
},
},
{
"name": "WebSearch",
"description": "Search the web via DuckDuckGo and return top results.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
},
"required": ["query"],
},
},
]
# ── Safe bash commands (never ask permission) ───────────────────────────────
_SAFE_PREFIXES = (
"ls", "cat", "head", "tail", "wc", "pwd", "echo", "printf", "date",
"which", "type", "env", "printenv", "uname", "whoami", "id",
"git log", "git status", "git diff", "git show", "git branch",
"git remote", "git stash list", "git tag",
"find ", "grep ", "rg ", "ag ", "fd ",
"python ", "python3 ", "node ", "ruby ", "perl ",
"pip show", "pip list", "npm list", "cargo metadata",
"df ", "du ", "free ", "top -bn", "ps ",
"curl -I", "curl --head",
)
def _is_safe_bash(cmd: str) -> bool:
c = cmd.strip()
return any(c.startswith(p) for p in _SAFE_PREFIXES)
# ── Tool implementations ───────────────────────────────────────────────────
def _read(file_path: str, limit: int = None, offset: int = None) -> str:
p = Path(file_path)
if not p.exists():
return f"Error: file not found: {file_path}"
if p.is_dir():
return f"Error: {file_path} is a directory"
try:
lines = p.read_text(errors="replace").splitlines(keepends=True)
start = offset or 0
chunk = lines[start:start + limit] if limit else lines[start:]
if not chunk:
return "(empty file)"
return "".join(f"{start + i + 1}\t{l}" for i, l in enumerate(chunk))
except Exception as e:
return f"Error: {e}"
def _write(file_path: str, content: str) -> str:
p = Path(file_path)
try:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
lc = content.count("\n") + (1 if content and not content.endswith("\n") else 0)
return f"Wrote {lc} lines to {file_path}"
except Exception as e:
return f"Error: {e}"
def _edit(file_path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
p = Path(file_path)
if not p.exists():
return f"Error: file not found: {file_path}"
try:
content = p.read_text()
count = content.count(old_string)
if count == 0:
return "Error: old_string not found in file"
if count > 1 and not replace_all:
return (f"Error: old_string appears {count} times. "
"Provide more context to make it unique, or use replace_all=true.")
new_content = content.replace(old_string, new_string) if replace_all else \
content.replace(old_string, new_string, 1)
p.write_text(new_content)
return f"Replaced {'all ' + str(count) if replace_all else '1'} occurrence(s) in {file_path}"
except Exception as e:
return f"Error: {e}"
def _bash(command: str, timeout: int = 30) -> str:
try:
r = subprocess.run(
command, shell=True, capture_output=True, text=True,
timeout=timeout, cwd=os.getcwd(),
)
out = r.stdout
if r.stderr:
out += ("\n" if out else "") + "[stderr]\n" + r.stderr
return out.strip() or "(no output)"
except subprocess.TimeoutExpired:
return f"Error: timed out after {timeout}s"
except Exception as e:
return f"Error: {e}"
def _glob(pattern: str, path: str = None) -> str:
base = Path(path) if path else Path.cwd()
try:
matches = sorted(base.glob(pattern))
if not matches:
return "No files matched"
return "\n".join(str(m) for m in matches[:500])
except Exception as e:
return f"Error: {e}"
def _has_rg() -> bool:
try:
subprocess.run(["rg", "--version"], capture_output=True, check=True)
return True
except Exception:
return False
def _grep(pattern: str, path: str = None, glob: str = None,
output_mode: str = "files_with_matches",
case_insensitive: bool = False, context: int = 0) -> str:
use_rg = _has_rg()
cmd = ["rg" if use_rg else "grep", "--no-heading"]
if case_insensitive:
cmd.append("-i")
if output_mode == "files_with_matches":
cmd.append("-l")
elif output_mode == "count":
cmd.append("-c")
else:
cmd.append("-n")
if context:
cmd += ["-C", str(context)]
if glob:
cmd += (["--glob", glob] if use_rg else ["--include", glob])
cmd.append(pattern)
cmd.append(path or str(Path.cwd()))
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
out = r.stdout.strip()
return out[:20000] if out else "No matches found"
except Exception as e:
return f"Error: {e}"
def _webfetch(url: str, prompt: str = None) -> str:
try:
import httpx
r = httpx.get(url, headers={"User-Agent": "NanoClaude/1.0"},
timeout=30, follow_redirects=True)
r.raise_for_status()
ct = r.headers.get("content-type", "")
if "html" in ct:
text = re.sub(r"<script[^>]*>.*?</script>", "", r.text,
flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r"<style[^>]*>.*?</style>", "", text,
flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
else:
text = r.text
return text[:25000]
except ImportError:
return "Error: httpx not installed — run: pip install httpx"
except Exception as e:
return f"Error: {e}"
def _websearch(query: str) -> str:
try:
import httpx
url = "https://html.duckduckgo.com/html/"
r = httpx.get(url, params={"q": query},
headers={"User-Agent": "Mozilla/5.0 (compatible)"},
timeout=30, follow_redirects=True)
titles = re.findall(r'class="result__title"[^>]*>.*?<a[^>]*href="([^"]+)"[^>]*>(.*?)</a>',
r.text, re.DOTALL)
snippets = re.findall(r'class="result__snippet"[^>]*>(.*?)</div>', r.text, re.DOTALL)
results = []
for i, (link, title) in enumerate(titles[:8]):
t = re.sub(r"<[^>]+>", "", title).strip()
s = re.sub(r"<[^>]+>", "", snippets[i]).strip() if i < len(snippets) else ""
results.append(f"**{t}**\n{link}\n{s}")
return "\n\n".join(results) if results else "No results found"
except ImportError:
return "Error: httpx not installed — run: pip install httpx"
except Exception as e:
return f"Error: {e}"
# ── Dispatcher ─────────────────────────────────────────────────────────────
def execute_tool(
name: str,
inputs: dict,
permission_mode: str = "auto",
ask_permission: Optional[Callable[[str], bool]] = None,
) -> str:
"""Dispatch tool execution; ask permission for write/destructive ops."""
def _check(desc: str) -> bool:
"""Return True if action is allowed."""
if permission_mode == "accept-all":
return True
if ask_permission:
return ask_permission(desc)
return True # headless: allow everything
if name == "Read":
return _read(inputs["file_path"], inputs.get("limit"), inputs.get("offset"))
elif name == "Write":
if not _check(f"Write to {inputs['file_path']}"):
return "Denied: user rejected write operation"
return _write(inputs["file_path"], inputs["content"])
elif name == "Edit":
if not _check(f"Edit {inputs['file_path']}"):
return "Denied: user rejected edit operation"
return _edit(inputs["file_path"], inputs["old_string"],
inputs["new_string"], inputs.get("replace_all", False))
elif name == "Bash":
cmd = inputs["command"]
if permission_mode != "accept-all" and not _is_safe_bash(cmd):
if not _check(f"Bash: {cmd}"):
return "Denied: user rejected bash command"
return _bash(cmd, inputs.get("timeout", 30))
elif name == "Glob":
return _glob(inputs["pattern"], inputs.get("path"))
elif name == "Grep":
return _grep(
inputs["pattern"], inputs.get("path"), inputs.get("glob"),
inputs.get("output_mode", "files_with_matches"),
inputs.get("case_insensitive", False),
inputs.get("context", 0),
)
elif name == "WebFetch":
return _webfetch(inputs["url"], inputs.get("prompt"))
elif name == "WebSearch":
return _websearch(inputs["query"])
else:
return f"Unknown tool: {name}"