Files
claw-code/src/main.py
YeonGyu-Kim 517d7e224e feat: #178 — argparse errors emit JSON envelope when --output-format json requested
Dogfood pinpoint: running 'claw nonexistent-command --output-format json' bypasses
the JSON envelope contract — argparse dumps human-readable usage to stderr with
exit 2, breaking the SCHEMAS.md guarantee that JSON mode returns structured output.

Problem:
  $ claw nonexistent --output-format json
  usage: main.py [-h] {summary,manifest,...} ...
  main.py: error: argument command: invalid choice: 'nonexistent' (choose from ...)
  [exit 2 — no envelope, claws must parse argparse usage messages]

Fix:
  $ claw nonexistent --output-format json
  {
    "timestamp": "2026-04-22T11:00:29Z",
    "command": "nonexistent-command",
    "exit_code": 1,
    "output_format": "json",
    "schema_version": "1.0",
    "error": {
      "kind": "parse",
      "operation": "argparse",
      "target": "nonexistent-command",
      "retryable": false,
      "message": "invalid command or argument (argparse rejection)",
      "hint": "run with no arguments to see available subcommands"
    }
  }
  [exit 1, clean JSON envelope on stdout per SCHEMAS.md]

Changes:
- src/main.py:
  - _wants_json_output(argv): pre-scan for --output-format json before parsing
  - _emit_parse_error_envelope(argv, message): emit wrapped envelope on stdout
  - main(): catch SystemExit from argparse; if JSON requested, emit envelope
    instead of letting argparse's help dump go through

- tests/test_parse_error_envelope.py (new, 9 tests):
  - TestParseErrorJsonEnvelope (7): unknown command, =syntax, text mode unchanged,
    invalid flag, missing command, valid command unaffected, common fields
  - TestParseErrorSchemaCompliance (2): error.kind='parse', retryable=false

Contract:
- text mode (default): unchanged — argparse dumps help to stderr, exits 2
- JSON mode: envelope per SCHEMAS.md, error.kind='parse', exit 1
- Parse errors always retryable=false (typo won't self-fix)
- error.kind='parse' already enumerated in SCHEMAS.md (no schema changes)

This closes a real gap: claws invoking unknown commands in JSON mode can now route
via exit code + envelope.kind='parse' instead of scraping argparse output.

Test results: 192 → 201 passing, 3 skipped unchanged, zero regression.

Pinpoint discovered via dogfood at 2026-04-22 19:59 KST (cycle #19).
2026-04-22 20:02:39 +09:00

715 lines
30 KiB
Python

from __future__ import annotations
import argparse
from .bootstrap_graph import build_bootstrap_graph
from .command_graph import build_command_graph
from .commands import execute_command, get_command, get_commands, render_command_index
from .direct_modes import run_deep_link, run_direct_connect
from .parity_audit import run_parity_audit
from .permissions import ToolPermissionContext
from .port_manifest import build_port_manifest
from .query_engine import QueryEnginePort
from .remote_runtime import run_remote_mode, run_ssh_mode, run_teleport_mode
from .runtime import PortRuntime
from .session_store import (
SessionDeleteError,
SessionNotFoundError,
delete_session,
list_sessions,
load_session,
session_exists,
)
from .setup import run_setup
from .tool_pool import assemble_tool_pool
from .tools import execute_tool, get_tool, get_tools, render_tool_index
def wrap_json_envelope(data: dict, command: str, exit_code: int = 0) -> dict:
"""Wrap command output in canonical JSON envelope per SCHEMAS.md."""
from datetime import datetime, timezone
now_utc = datetime.now(timezone.utc).isoformat(timespec='seconds').replace('+00:00', 'Z')
return {
'timestamp': now_utc,
'command': command,
'exit_code': exit_code,
'output_format': 'json',
'schema_version': '1.0',
**data,
}
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description='Python porting workspace for the Claude Code rewrite effort')
subparsers = parser.add_subparsers(dest='command', required=True)
subparsers.add_parser('summary', help='render a Markdown summary of the Python porting workspace')
subparsers.add_parser('manifest', help='print the current Python workspace manifest')
subparsers.add_parser('parity-audit', help='compare the Python workspace against the local ignored TypeScript archive when available')
subparsers.add_parser('setup-report', help='render the startup/prefetch setup report')
command_graph_parser = subparsers.add_parser('command-graph', help='show command graph segmentation')
command_graph_parser.add_argument('--output-format', choices=['text', 'json'], default='text')
tool_pool_parser = subparsers.add_parser('tool-pool', help='show assembled tool pool with default settings')
tool_pool_parser.add_argument('--output-format', choices=['text', 'json'], default='text')
bootstrap_graph_parser = subparsers.add_parser('bootstrap-graph', help='show the mirrored bootstrap/runtime graph stages')
bootstrap_graph_parser.add_argument('--output-format', choices=['text', 'json'], default='text')
list_parser = subparsers.add_parser('subsystems', help='list the current Python modules in the workspace')
list_parser.add_argument('--limit', type=int, default=32)
commands_parser = subparsers.add_parser('commands', help='list mirrored command entries from the archived snapshot')
commands_parser.add_argument('--limit', type=int, default=20)
commands_parser.add_argument('--query')
commands_parser.add_argument('--no-plugin-commands', action='store_true')
commands_parser.add_argument('--no-skill-commands', action='store_true')
tools_parser = subparsers.add_parser('tools', help='list mirrored tool entries from the archived snapshot')
tools_parser.add_argument('--limit', type=int, default=20)
tools_parser.add_argument('--query')
tools_parser.add_argument('--simple-mode', action='store_true')
tools_parser.add_argument('--no-mcp', action='store_true')
tools_parser.add_argument('--deny-tool', action='append', default=[])
tools_parser.add_argument('--deny-prefix', action='append', default=[])
route_parser = subparsers.add_parser('route', help='route a prompt across mirrored command/tool inventories')
route_parser.add_argument('prompt')
route_parser.add_argument('--limit', type=int, default=5)
# #168: parity with show-command/show-tool/session-lifecycle CLI family
route_parser.add_argument('--output-format', choices=['text', 'json'], default='text')
bootstrap_parser = subparsers.add_parser('bootstrap', help='build a runtime-style session report from the mirrored inventories')
bootstrap_parser.add_argument('prompt')
bootstrap_parser.add_argument('--limit', type=int, default=5)
# #168: parity with CLI family
bootstrap_parser.add_argument('--output-format', choices=['text', 'json'], default='text')
loop_parser = subparsers.add_parser('turn-loop', help='run a small stateful turn loop for the mirrored runtime')
loop_parser.add_argument('prompt')
loop_parser.add_argument('--limit', type=int, default=5)
loop_parser.add_argument('--max-turns', type=int, default=3)
loop_parser.add_argument('--structured-output', action='store_true')
loop_parser.add_argument(
'--timeout-seconds',
type=float,
default=None,
help='total wall-clock budget across all turns (#161). Default: unbounded.',
)
loop_parser.add_argument(
'--continuation-prompt',
default=None,
help=(
'prompt to submit on turns after the first (#163). Default: None '
'(loop stops after turn 0). Replaces the deprecated implicit "[turn N]" '
'suffix that used to pollute the transcript.'
),
)
loop_parser.add_argument(
'--output-format',
choices=['text', 'json'],
default='text',
help='output format (#164 Stage B: JSON includes cancel_observed per turn)',
)
flush_parser = subparsers.add_parser(
'flush-transcript',
help='persist and flush a temporary session transcript (#160/#166: claw-native session API)',
)
flush_parser.add_argument('prompt')
flush_parser.add_argument(
'--directory', help='session storage directory (default: .port_sessions)'
)
flush_parser.add_argument(
'--output-format',
choices=['text', 'json'],
default='text',
help='output format',
)
flush_parser.add_argument(
'--session-id',
help='deterministic session ID (default: auto-generated UUID)',
)
load_session_parser = subparsers.add_parser(
'load-session',
help='load a previously persisted session (#160/#165: claw-native session API)',
)
load_session_parser.add_argument('session_id')
load_session_parser.add_argument(
'--directory', help='session storage directory (default: .port_sessions)'
)
load_session_parser.add_argument(
'--output-format',
choices=['text', 'json'],
default='text',
help='output format',
)
list_sessions_parser = subparsers.add_parser(
'list-sessions',
help='enumerate stored session IDs (#160: claw-native session API)',
)
list_sessions_parser.add_argument(
'--directory', help='session storage directory (default: .port_sessions)'
)
list_sessions_parser.add_argument(
'--output-format',
choices=['text', 'json'],
default='text',
help='output format',
)
delete_session_parser = subparsers.add_parser(
'delete-session',
help='delete a persisted session (#160: idempotent, race-safe)',
)
delete_session_parser.add_argument('session_id')
delete_session_parser.add_argument(
'--directory', help='session storage directory (default: .port_sessions)'
)
delete_session_parser.add_argument(
'--output-format',
choices=['text', 'json'],
default='text',
help='output format',
)
remote_parser = subparsers.add_parser('remote-mode', help='simulate remote-control runtime branching')
remote_parser.add_argument('target')
ssh_parser = subparsers.add_parser('ssh-mode', help='simulate SSH runtime branching')
ssh_parser.add_argument('target')
teleport_parser = subparsers.add_parser('teleport-mode', help='simulate teleport runtime branching')
teleport_parser.add_argument('target')
direct_parser = subparsers.add_parser('direct-connect-mode', help='simulate direct-connect runtime branching')
direct_parser.add_argument('target')
deep_link_parser = subparsers.add_parser('deep-link-mode', help='simulate deep-link runtime branching')
deep_link_parser.add_argument('target')
show_command = subparsers.add_parser('show-command', help='show one mirrored command entry by exact name')
show_command.add_argument('name')
show_command.add_argument('--output-format', choices=['text', 'json'], default='text')
show_tool = subparsers.add_parser('show-tool', help='show one mirrored tool entry by exact name')
show_tool.add_argument('name')
show_tool.add_argument('--output-format', choices=['text', 'json'], default='text')
exec_command_parser = subparsers.add_parser('exec-command', help='execute a mirrored command shim by exact name')
exec_command_parser.add_argument('name')
exec_command_parser.add_argument('prompt')
# #168: parity with CLI family
exec_command_parser.add_argument('--output-format', choices=['text', 'json'], default='text')
exec_tool_parser = subparsers.add_parser('exec-tool', help='execute a mirrored tool shim by exact name')
exec_tool_parser.add_argument('name')
exec_tool_parser.add_argument('payload')
# #168: parity with CLI family
exec_tool_parser.add_argument('--output-format', choices=['text', 'json'], default='text')
return parser
def _emit_parse_error_envelope(argv: list[str], message: str) -> None:
"""#178: emit JSON envelope for argparse-level errors when --output-format json is requested.
Pre-scans argv for --output-format json. If found, prints a parse-error envelope
to stdout (per SCHEMAS.md 'error' envelope shape) instead of letting argparse
dump help text to stderr. This preserves the JSON contract for claws that can't
parse argparse usage messages.
"""
import json
# Extract the attempted command (argv[0] is the first positional)
attempted = argv[0] if argv and not argv[0].startswith('-') else '<missing>'
envelope = wrap_json_envelope(
{
'error': {
'kind': 'parse',
'operation': 'argparse',
'target': attempted,
'retryable': False,
'message': message,
'hint': 'run with no arguments to see available subcommands',
},
},
command=attempted,
exit_code=1,
)
print(json.dumps(envelope))
def _wants_json_output(argv: list[str]) -> bool:
"""#178: check if argv contains --output-format json anywhere (for parse-error routing)."""
for i, arg in enumerate(argv):
if arg == '--output-format' and i + 1 < len(argv) and argv[i + 1] == 'json':
return True
if arg == '--output-format=json':
return True
return False
def main(argv: list[str] | None = None) -> int:
import sys
if argv is None:
argv = sys.argv[1:]
parser = build_parser()
# #178: catch argparse errors and emit JSON envelope when --output-format json present
try:
args = parser.parse_args(argv)
except SystemExit as exc:
# argparse exits with SystemExit on error (code 2) or --help (code 0)
if exc.code != 0 and _wants_json_output(argv):
# Reconstruct a generic parse-error message
_emit_parse_error_envelope(argv, 'invalid command or argument (argparse rejection)')
return 1
raise
manifest = build_port_manifest()
if args.command == 'summary':
print(QueryEnginePort(manifest).render_summary())
return 0
if args.command == 'manifest':
print(manifest.to_markdown())
return 0
if args.command == 'parity-audit':
print(run_parity_audit().to_markdown())
return 0
if args.command == 'setup-report':
print(run_setup().as_markdown())
return 0
if args.command == 'command-graph':
graph = build_command_graph()
if args.output_format == 'json':
import json
envelope = {
'builtins_count': len(graph.builtins),
'plugin_like_count': len(graph.plugin_like),
'skill_like_count': len(graph.skill_like),
'total_count': len(graph.flattened()),
'builtins': [{'name': m.name, 'source_hint': m.source_hint} for m in graph.builtins],
'plugin_like': [{'name': m.name, 'source_hint': m.source_hint} for m in graph.plugin_like],
'skill_like': [{'name': m.name, 'source_hint': m.source_hint} for m in graph.skill_like],
}
print(json.dumps(wrap_json_envelope(envelope, args.command)))
else:
print(graph.as_markdown())
return 0
if args.command == 'tool-pool':
pool = assemble_tool_pool()
if args.output_format == 'json':
import json
envelope = {
'simple_mode': pool.simple_mode,
'include_mcp': pool.include_mcp,
'tool_count': len(pool.tools),
'tools': [{'name': t.name, 'source_hint': t.source_hint} for t in pool.tools],
}
print(json.dumps(wrap_json_envelope(envelope, args.command)))
else:
print(pool.as_markdown())
return 0
if args.command == 'bootstrap-graph':
graph = build_bootstrap_graph()
if args.output_format == 'json':
import json
envelope = {'stages': graph.as_markdown().split('\n'), 'note': 'bootstrap-graph is markdown-only in this version'}
print(json.dumps(wrap_json_envelope(envelope, args.command)))
else:
print(graph.as_markdown())
return 0
if args.command == 'subsystems':
for subsystem in manifest.top_level_modules[: args.limit]:
print(f'{subsystem.name}\t{subsystem.file_count}\t{subsystem.notes}')
return 0
if args.command == 'commands':
if args.query:
print(render_command_index(limit=args.limit, query=args.query))
else:
commands = get_commands(include_plugin_commands=not args.no_plugin_commands, include_skill_commands=not args.no_skill_commands)
output_lines = [f'Command entries: {len(commands)}', '']
output_lines.extend(f'- {module.name}{module.source_hint}' for module in commands[: args.limit])
print('\n'.join(output_lines))
return 0
if args.command == 'tools':
if args.query:
print(render_tool_index(limit=args.limit, query=args.query))
else:
permission_context = ToolPermissionContext.from_iterables(args.deny_tool, args.deny_prefix)
tools = get_tools(simple_mode=args.simple_mode, include_mcp=not args.no_mcp, permission_context=permission_context)
output_lines = [f'Tool entries: {len(tools)}', '']
output_lines.extend(f'- {module.name}{module.source_hint}' for module in tools[: args.limit])
print('\n'.join(output_lines))
return 0
if args.command == 'route':
matches = PortRuntime().route_prompt(args.prompt, limit=args.limit)
# #168: JSON envelope for machine parsing
if args.output_format == 'json':
import json
envelope = {
'prompt': args.prompt,
'limit': args.limit,
'match_count': len(matches),
'matches': [
{
'kind': m.kind,
'name': m.name,
'score': m.score,
'source_hint': m.source_hint,
}
for m in matches
],
}
print(json.dumps(wrap_json_envelope(envelope, args.command)))
return 0
if not matches:
print('No mirrored command/tool matches found.')
return 0
for match in matches:
print(f'{match.kind}\t{match.name}\t{match.score}\t{match.source_hint}')
return 0
if args.command == 'bootstrap':
session = PortRuntime().bootstrap_session(args.prompt, limit=args.limit)
# #168: JSON envelope for machine parsing
if args.output_format == 'json':
import json
envelope = {
'prompt': session.prompt,
'limit': args.limit,
'setup': {
'python_version': session.setup.python_version,
'implementation': session.setup.implementation,
'platform_name': session.setup.platform_name,
'test_command': session.setup.test_command,
},
'routed_matches': [
{
'kind': m.kind,
'name': m.name,
'score': m.score,
'source_hint': m.source_hint,
}
for m in session.routed_matches
],
'command_execution_messages': list(session.command_execution_messages),
'tool_execution_messages': list(session.tool_execution_messages),
'turn': {
'prompt': session.turn_result.prompt,
'output': session.turn_result.output,
'stop_reason': session.turn_result.stop_reason,
'cancel_observed': session.turn_result.cancel_observed,
},
'persisted_session_path': session.persisted_session_path,
}
print(json.dumps(wrap_json_envelope(envelope, args.command)))
return 0
print(session.as_markdown())
return 0
if args.command == 'turn-loop':
results = PortRuntime().run_turn_loop(
args.prompt,
limit=args.limit,
max_turns=args.max_turns,
structured_output=args.structured_output,
timeout_seconds=args.timeout_seconds,
continuation_prompt=args.continuation_prompt,
)
# Exit 2 when a timeout terminated the loop so claws can distinguish
# 'ran to completion' from 'hit wall-clock budget'.
loop_exit_code = 2 if results and results[-1].stop_reason == 'timeout' else 0
if args.output_format == 'json':
# #164 Stage B + #173: JSON envelope with per-turn cancel_observed
# Promotes turn-loop from OPT_OUT to CLAWABLE surface.
import json
envelope = {
'prompt': args.prompt,
'max_turns': args.max_turns,
'turns_completed': len(results),
'timeout_seconds': args.timeout_seconds,
'continuation_prompt': args.continuation_prompt,
'turns': [
{
'prompt': r.prompt,
'output': r.output,
'stop_reason': r.stop_reason,
'cancel_observed': r.cancel_observed,
'matched_commands': list(r.matched_commands),
'matched_tools': list(r.matched_tools),
}
for r in results
],
'final_stop_reason': results[-1].stop_reason if results else None,
'final_cancel_observed': results[-1].cancel_observed if results else False,
}
print(json.dumps(wrap_json_envelope(envelope, args.command, exit_code=loop_exit_code)))
return loop_exit_code
for idx, result in enumerate(results, start=1):
print(f'## Turn {idx}')
print(result.output)
print(f'stop_reason={result.stop_reason}')
return loop_exit_code
if args.command == 'flush-transcript':
from pathlib import Path as _Path
engine = QueryEnginePort.from_workspace()
# #166: allow deterministic session IDs for claw checkpointing/replay.
# When unset, the engine's auto-generated UUID is used (backward compat).
if args.session_id:
engine.session_id = args.session_id
engine.submit_message(args.prompt)
directory = _Path(args.directory) if args.directory else None
path = engine.persist_session(directory)
if args.output_format == 'json':
import json as _json
_env = {
'session_id': engine.session_id,
'path': path,
'flushed': engine.transcript_store.flushed,
'messages_count': len(engine.mutable_messages),
'input_tokens': engine.total_usage.input_tokens,
'output_tokens': engine.total_usage.output_tokens,
}
print(_json.dumps(wrap_json_envelope(_env, args.command)))
else:
# #166: legacy text output preserved byte-for-byte for backward compat.
print(path)
print(f'flushed={engine.transcript_store.flushed}')
return 0
if args.command == 'load-session':
from pathlib import Path as _Path
directory = _Path(args.directory) if args.directory else None
# #165: catch typed SessionNotFoundError + surface a JSON error envelope
# matching the delete-session contract shape. No more raw tracebacks.
try:
session = load_session(args.session_id, directory)
except SessionNotFoundError as exc:
if args.output_format == 'json':
import json as _json
resolved_dir = str(directory) if directory else '.port_sessions'
_env = {
'session_id': args.session_id,
'loaded': False,
'error': {
'kind': 'session_not_found',
'message': str(exc),
'directory': resolved_dir,
'retryable': False,
},
}
print(_json.dumps(wrap_json_envelope(_env, args.command, exit_code=1)))
else:
print(f'error: {exc}')
return 1
except (OSError, ValueError) as exc:
# Corrupted session file, IO error, JSON decode error — distinct
# from 'not found'. Callers may retry here (fs glitch).
if args.output_format == 'json':
import json as _json
resolved_dir = str(directory) if directory else '.port_sessions'
_env = {
'session_id': args.session_id,
'loaded': False,
'error': {
'kind': 'session_load_failed',
'message': str(exc),
'directory': resolved_dir,
'retryable': True,
},
}
print(_json.dumps(wrap_json_envelope(_env, args.command, exit_code=1)))
else:
print(f'error: {exc}')
return 1
if args.output_format == 'json':
import json as _json
_env = {
'session_id': session.session_id,
'loaded': True,
'messages_count': len(session.messages),
'input_tokens': session.input_tokens,
'output_tokens': session.output_tokens,
}
print(_json.dumps(wrap_json_envelope(_env, args.command)))
else:
print(f'{session.session_id}\n{len(session.messages)} messages\nin={session.input_tokens} out={session.output_tokens}')
return 0
if args.command == 'list-sessions':
from pathlib import Path as _Path
directory = _Path(args.directory) if args.directory else None
ids = list_sessions(directory)
if args.output_format == 'json':
import json as _json
_env = {'sessions': ids, 'count': len(ids)}
print(_json.dumps(wrap_json_envelope(_env, args.command)))
else:
if not ids:
print('(no sessions)')
else:
for sid in ids:
print(sid)
return 0
if args.command == 'delete-session':
from pathlib import Path as _Path
directory = _Path(args.directory) if args.directory else None
try:
deleted = delete_session(args.session_id, directory)
except SessionDeleteError as exc:
if args.output_format == 'json':
import json as _json
_env = {
'session_id': args.session_id,
'deleted': False,
'error': {
'kind': 'session_delete_failed',
'message': str(exc),
'retryable': True,
},
}
print(_json.dumps(wrap_json_envelope(_env, args.command, exit_code=1)))
else:
print(f'error: {exc}')
return 1
if args.output_format == 'json':
import json as _json
_env = {
'session_id': args.session_id,
'deleted': deleted,
'status': 'deleted' if deleted else 'not_found',
}
print(_json.dumps(wrap_json_envelope(_env, args.command)))
else:
if deleted:
print(f'deleted: {args.session_id}')
else:
print(f'not found: {args.session_id}')
# Exit 0 for both cases — delete_session is idempotent,
# not-found is success from a cleanup perspective
return 0
if args.command == 'remote-mode':
print(run_remote_mode(args.target).as_text())
return 0
if args.command == 'ssh-mode':
print(run_ssh_mode(args.target).as_text())
return 0
if args.command == 'teleport-mode':
print(run_teleport_mode(args.target).as_text())
return 0
if args.command == 'direct-connect-mode':
print(run_direct_connect(args.target).as_text())
return 0
if args.command == 'deep-link-mode':
print(run_deep_link(args.target).as_text())
return 0
if args.command == 'show-command':
module = get_command(args.name)
if module is None:
if args.output_format == 'json':
import json
error_envelope = {
'name': args.name,
'found': False,
'error': {
'kind': 'command_not_found',
'message': f'Unknown command: {args.name}',
'retryable': False,
},
}
print(json.dumps(wrap_json_envelope(error_envelope, args.command, exit_code=1)))
else:
print(f'Command not found: {args.name}')
return 1
if args.output_format == 'json':
import json
output = {
'name': module.name,
'found': True,
'source_hint': module.source_hint,
'responsibility': module.responsibility,
}
print(json.dumps(wrap_json_envelope(output, args.command)))
else:
print('\n'.join([module.name, module.source_hint, module.responsibility]))
return 0
if args.command == 'show-tool':
module = get_tool(args.name)
if module is None:
if args.output_format == 'json':
import json
error_envelope = {
'name': args.name,
'found': False,
'error': {
'kind': 'tool_not_found',
'message': f'Unknown tool: {args.name}',
'retryable': False,
},
}
print(json.dumps(wrap_json_envelope(error_envelope, args.command, exit_code=1)))
else:
print(f'Tool not found: {args.name}')
return 1
if args.output_format == 'json':
import json
output = {
'name': module.name,
'found': True,
'source_hint': module.source_hint,
'responsibility': module.responsibility,
}
print(json.dumps(wrap_json_envelope(output, args.command)))
else:
print('\n'.join([module.name, module.source_hint, module.responsibility]))
return 0
if args.command == 'exec-command':
result = execute_command(args.name, args.prompt)
# #168: JSON envelope with typed not-found error
if args.output_format == 'json':
import json
if not result.handled:
envelope = {
'name': args.name,
'prompt': args.prompt,
'handled': False,
'error': {
'kind': 'command_not_found',
'message': result.message,
'retryable': False,
},
}
else:
envelope = {
'name': result.name,
'prompt': result.prompt,
'source_hint': result.source_hint,
'handled': True,
'message': result.message,
}
print(json.dumps(wrap_json_envelope(envelope, args.command)))
else:
print(result.message)
return 0 if result.handled else 1
if args.command == 'exec-tool':
result = execute_tool(args.name, args.payload)
# #168: JSON envelope with typed not-found error
if args.output_format == 'json':
import json
if not result.handled:
envelope = {
'name': args.name,
'payload': args.payload,
'handled': False,
'error': {
'kind': 'tool_not_found',
'message': result.message,
'retryable': False,
},
}
else:
envelope = {
'name': result.name,
'payload': result.payload,
'source_hint': result.source_hint,
'handled': True,
'message': result.message,
}
print(json.dumps(wrap_json_envelope(envelope, args.command)))
else:
print(result.message)
return 0 if result.handled else 1
parser.error(f'unknown command: {args.command}')
return 2
if __name__ == '__main__':
raise SystemExit(main())