Files
hunfabric/tools/skill-turn-harness/harness.py
Ludovico Magnocavallo 81f72e8068 Add FAST Prerequisites Skill and Gemini Skill Test Harness (#3979)
* initial version of a FAST pre-install skill

* first round of testing

* Update fast-0-org-setup-prereqs skill with improved UX and local path handling

- Add explicit lockout warning and stop condition if the user is not a member of the provided Admin Principal group.
- Streamline bootstrap project selection to only prompt for an override if the active gcloud project is rejected.
- Restrict dataset discovery strictly to the `fast/stages/0-org-setup/datasets/` directory.
- Improve location handling by referencing `defaults.schema.json` for Standard GCP and auto-configuring fixed regions for GCD.
- Add comprehensive `local_path` management: prompt for customization, create directories, move `defaults.yaml` to the local data folder, and symlink `0-org-setup.auto.tfvars` back to the stage directory.

* add testing scenarios, implement initial changes for scenario 2

* move skills

* move to a skills/fast subfolder

* Refactor fast-0-org-setup prereqs skill

* Add skill-turn-harness utility tool

* Use relative markdown links for skill references

* Use descriptive titles for markdown links in skill references

* Add descriptions to each phase in the prerequisites workflow map

* Use backslash for markdown line breaks in skill map

* Update README security warning to mention default .gitignore

* shebang

* Update fast prereqs skill rules to force sequential question flow and refine harness tool with proper ctrl+c handling and slugified log paths

* Move playbook-gcp-dev.yaml to fast/prerequisites/gcp-dev.yaml and update fast prerequisites

* docs(skill-turn-harness): detail autonomous pond testing approach

* docs(skill-turn-harness): add final_state_checks to pond architecture and update toc

* Refine fast prereqs SKILL and gcp-dev playbook to strictly align with one-question-at-a-time rule

* feat(skill-turn-harness): update playbook schema for autonomous persona mode

* feat(skill-turn-harness): implement autonomous persona testing mode and fallback logic

* docs(skill-turn-harness): document the three modes of testing and update ToC

* implement timeout, schema validation, configurable cli

* chore: remove accidentally committed log files

* chore: ignore logs directory

* feat(skill-harness): implement tool execution interception, configurable workspace, and modularized validation

* feat(skill-harness): add model configuration and update README

* fix(skill-harness): automatically inject -y flag to gemini commands

* docs(skill-harness): add TODO.md with analysis for skill environment dependencies

* feat(skill-harness): add working_dir support and clean up fixtures

- Implement working_dir in harness to run tests in specific directories.
- Rename test fixtures and playbooks to be more descriptive.
- Add E2E test for working_dir.
- Apply code quality improvements to harness.py (imports, linting).
- Update README with working directory considerations and usage notes.
- Update phase3-bootstrap-and-iam.md skill doc to add execution rule against creating temp scripts.

* fix: capture customer_id and respect relative paths

* Implement isolated temp workspace sandboxing with symlinks in test harness

* Configure GCD manual autonomous playbook and align Phase 3/4 steps order

* Fix linting and schema tests failures

- Add missing license headers to tools/skill-turn-harness files.

- Fix trailing spaces and newlines in playbooks.

- Ignore tools directory in schema tests workflow.

TAG=agy

CONV=1bb75453-c3e2-448b-bae9-8e332a068012

* Fix Python formatting with yapf

TAG=agy

CONV=1bb75453-c3e2-448b-bae9-8e332a068012

* Refactor skill-turn-harness to use Antigravity SDK

- Migrated harness from gemini-cli subprocesses to Antigravity SDK.
- Implemented real-time step streaming and console logging.
- Added color-coded terminal output (dark gray headers, blue inputs, pink outputs).
- Collapsed excessive newlines in streamed thoughts.
- Excluded harness codebase from workspace copy to prevent agent cheating.
- Enabled skills folder copy to resolve agent lookup loops.
- Added key validation and CLI --debug flag.

* Fix autonomous turn layout: print Turn ID before execution

- Moved the [Autonomous Turn X] header print to before running the agent turn.
- This groups the real-time thinking and tool calls under the correct Turn ID block, instead of displaying them before the label.

* Remove obsolete .log.md from prerequisites skill directory
2026-05-22 17:16:54 +00:00

1045 lines
35 KiB
Python
Executable File

#!/usr/bin/env python3
# /// script
# dependencies = [
# "google-antigravity",
# "google-genai",
# "pydantic",
# "pyyaml",
# "click",
# "jsonschema",
# ]
# ///
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''Hybrid Python/CLI Test Harness for Gemini Skills.
This module provides a testing framework that executes Gemini CLI skills in an
isolated subprocess and evaluates the interactions using the Gemini API and
structured Pydantic schemas.
'''
# Standard library imports
import glob
import json
import logging
import os
import re
import shutil
import string
import subprocess
import sys
import tempfile
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Dict
# Third-party imports
import click
import jsonschema
import yaml
from google import genai
from google.genai import types
from pydantic import BaseModel
from google.antigravity import Agent, LocalAgentConfig
from google.antigravity import types as agy_types
from google.antigravity.hooks import policy
import asyncio
def load_env_file(env_file_path: str):
'''Loads a .env file and injects its key-value pairs into os.environ.
Args:
env_file_path: The path to the .env file.
'''
if not os.path.exists(env_file_path):
raise FileNotFoundError(f'Environment file not found: {env_file_path}')
with open(env_file_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
def validate_playbook(playbook: dict):
'''Validates a playbook dictionary against the JSON schema.
Args:
playbook: The loaded playbook dictionary.
'''
schema_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'playbooks', 'playbook.schema.json')
if os.path.exists(schema_path):
with open(schema_path, 'r') as f:
schema = json.load(f)
try:
jsonschema.validate(instance=playbook, schema=schema)
except jsonschema.exceptions.ValidationError as e:
print(f"❌ [VALIDATION ERROR] Playbook is invalid: {e.message}",
file=sys.stderr)
sys.exit(1)
else:
print(
f"⚠️ [WARNING] Schema file not found at {schema_path}. Skipping validation."
)
@dataclass
class StepData:
step_index: int
user_input: str
expected_outcome: str
skill_response: str = ''
parsed_eval: Optional[dict] = None
is_system_error: bool = False
# 1. Define Strict Evaluator Schema
class EvaluationResult(BaseModel):
passed: bool
reasoning: str
class AutonomousTurnResult(BaseModel):
agent_followed_skill_rules: bool
reasoning: str
test_completed_successfully: bool
next_user_input: str
# Ensure GEMINI_API_KEY is available. If not, try to load from ~/.gemini/key.env
if 'GEMINI_API_KEY' not in os.environ:
key_file = os.path.expanduser('~/.gemini/key.env')
if os.path.exists(key_file):
with open(key_file, 'r') as f:
for line in f:
if line.startswith('GEMINI_API_KEY='):
os.environ['GEMINI_API_KEY'] = line.strip().split('=', 1)[1]
break
C_GRAY = '\033[90m'
C_BLUE = '\033[94m'
C_PINK = '\033[95m'
C_GREEN = '\033[92m'
C_RED = '\033[91m'
C_YELLOW = '\033[93m'
C_LIGHT_GRAY = '\033[37m'
C_BOLD_WHITE = '\033[1;37m'
def format_color(text: str, color: str) -> str:
"""Formats text with ANSI color if stdout is a TTY."""
if sys.stdout.isatty():
return f"{color}{text}\033[0m"
return text
class StreamingTrimmer:
"""Buffers streaming deltas to strip leading/trailing whitespace dynamically."""
def __init__(self):
self.started = False
self.whitespace_buffer = ""
def process_delta(self, delta: str) -> str:
if not delta:
return ""
out = ""
if not self.started:
stripped = delta.lstrip()
if not stripped:
return ""
self.started = True
delta = stripped
rstripped = delta.rstrip()
trailing_ws = delta[len(rstripped):]
if rstripped:
out = self.whitespace_buffer + rstripped
self.whitespace_buffer = trailing_ws
else:
self.whitespace_buffer += trailing_ws
if out:
out = re.sub(r'\n{3,}', '\n\n', out)
return out
def flush_remaining(self) -> None:
self.started = False
self.whitespace_buffer = ""
def init_markdown_log(md_log_path: str, playbook_name: str):
'''Initializes the markdown log file with a header.
Args:
md_log_path: The file path where the markdown log will be written.
playbook_name: The name of the playbook being executed.
'''
with open(md_log_path, 'w') as md_file:
md_file.write(f'# Interaction Log: {playbook_name}\n\n')
def log_step_to_markdown(
md_log_path: str,
step_index: int,
user_input: str,
expected_outcome: str,
skill_response: str,
parsed_eval: dict = None,
is_system_error: bool = False,
):
'''Appends a single interaction step to the markdown log.
Args:
md_log_path: The file path of the markdown log.
step_index: The zero-based index of the current step.
user_input: The simulated user input.
expected_outcome: The expected outcome defined in the playbook.
skill_response: The actual response from the agent.
parsed_eval: The parsed JSON evaluation result from the LLM evaluator.
is_system_error: True if the CLI execution failed with a system error.
'''
with open(md_log_path, 'a') as md_file:
md_file.write(f'## Step {step_index + 1}\n\n')
md_file.write(f'**User:**\n\n{user_input}\n\n')
md_file.write(f'**Expected Outcome:**\n\n{expected_outcome}\n\n')
md_file.write(f'**Agent:**\n\n{skill_response}\n\n')
if is_system_error:
md_file.write('*❌ FAIL: System Error*\n\n---\n\n')
elif parsed_eval:
status = '✅ PASS' if parsed_eval['passed'] else '❌ FAIL'
md_file.write(f'*{status}: {parsed_eval["reasoning"]}*\n\n')
md_file.write('---\n\n')
def generate_log_prefix(playbook_path: str) -> str:
'''Generates a slugified prefix for log files using the parent directory and filename.
Args:
playbook_path: The file path to the YAML playbook.
Returns:
A slugified string with a timestamp.
'''
dir_name = os.path.basename(os.path.dirname(playbook_path))
file_name = os.path.splitext(os.path.basename(playbook_path))[0]
combined = f'{dir_name}_{file_name}' if dir_name else file_name
slug = re.sub(r'[^a-zA-Z0-9]+', '-', combined).strip('-').lower()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
return f'{slug}-{timestamp}'
def dump_failed_log(log_dir: str, log_prefix: str, interaction_log: list):
'''Dumps the full interaction log to a JSON file upon failure.
Args:
log_dir: The directory where the failed log should be saved.
log_prefix: The prefix string generated for this playbook run.
interaction_log: The list of step data dictionaries recorded so far.
'''
failed_json_path = os.path.join(log_dir, f'{log_prefix}_failed.json')
with open(failed_json_path, 'w') as f:
json.dump(interaction_log, f, indent=2)
def get_skill_name(skill_src: str) -> str:
'''Extracts the skill name from the SKILL.md YAML frontmatter.
Args:
skill_src: The directory path containing the SKILL.md file.
Returns:
The name of the skill.
'''
skill_md_path = os.path.join(skill_src, 'SKILL.md')
if not os.path.exists(skill_md_path):
raise ValueError(f'SKILL.md not found in {skill_src}')
with open(skill_md_path, 'r') as f:
content = f.read()
if content.startswith('---'):
end_idx = content.find('---', 3)
if end_idx != -1:
frontmatter = content[3:end_idx]
metadata = yaml.safe_load(frontmatter)
if metadata and 'name' in metadata:
return metadata['name']
raise ValueError('Could not parse skill name from SKILL.md frontmatter')
def parse_and_validate_env(playbook: dict) -> Dict[str, str]:
'''Validates required environment variables and returns a substitution dict.
Args:
playbook: The loaded playbook dictionary.
Returns:
A dictionary mapping the required environment keys to their values.
Raises:
ValueError: If a required environment variable is missing.
'''
required_envs = playbook.get('env', [])
env_context = {}
missing_keys = []
for key in required_envs:
if key in os.environ:
env_context[key] = os.environ[key]
else:
missing_keys.append(key)
if missing_keys:
raise ValueError(
f'Missing required environment variables: {", ".join(missing_keys)}')
return env_context
def check_flow_contains(flow_contains: list, full_stdout: str) -> bool:
'''Checks if literal strings are present in the combined stdout.
Args:
flow_contains: A list of literal strings to search for.
full_stdout: The combined stdout of all CLI invocations.
Returns:
True if all strings are found, False otherwise.
'''
passed = True
for literal in flow_contains:
if literal not in full_stdout:
print(
f"❌ [CHECK FAILED]: Expected literal '{literal}' not found in output flow."
)
passed = False
return passed
def check_files_exist(files_exist: list, workspace_dir: str) -> bool:
'''Checks if specified files exist within the workspace.
Args:
files_exist: A list of relative file paths.
workspace_dir: The temporary workspace directory path.
Returns:
True if all files exist, False otherwise.
'''
passed = True
for file_path in files_exist:
full_path = os.path.join(workspace_dir, file_path)
if not os.path.exists(full_path):
print(f"❌ [CHECK FAILED]: Expected file '{file_path}' does not exist.")
passed = False
return passed
def check_files_contain(files_contain: dict, workspace_dir: str) -> bool:
'''Checks if specified files contain expected literal strings.
Args:
files_contain: A dictionary mapping relative file paths to a list of expected strings.
workspace_dir: The temporary workspace directory path.
Returns:
True if all files contain their expected strings, False otherwise.
'''
passed = True
for file_path, expected_content in files_contain.items():
full_path = os.path.join(workspace_dir, file_path)
if not os.path.exists(full_path):
print(
f"❌ [CHECK FAILED]: Expected file '{file_path}' does not exist for content check."
)
passed = False
else:
with open(full_path, 'r') as f:
content = f.read()
for expected in expected_content:
if expected not in content:
print(
f"❌ [CHECK FAILED]: Expected content '{expected}' not found in '{file_path}'."
)
passed = False
return passed
def check_tool_calls_contain(tool_calls_criteria: dict,
workspace_dir: str) -> bool:
'''Checks if the agent's tool calls contain expected literal strings in their arguments.
Args:
tool_calls_criteria: A dictionary mapping tool names to lists of expected strings.
workspace_dir: The temporary workspace directory path.
Returns:
True if all tool calls contain their expected strings, False otherwise.
'''
if not tool_calls_criteria:
return True
passed = True
workspace_name = os.path.basename(workspace_dir)
slugified_name = re.sub(r'[^a-zA-Z0-9]+', '-',
workspace_name).strip('-').lower()
session_files = glob.glob(
os.path.expanduser(
f'~/.gemini/tmp/{slugified_name}/chats/session-*.json'))
if not session_files:
print(
"❌ [CHECK FAILED]: Expected session JSON file not found in workspace for tool validation."
)
return False
session_files.sort(key=os.path.getmtime, reverse=True)
try:
with open(session_files[0], 'r') as f:
session_data = json.load(f)
extracted_calls: Dict[str, str] = {}
for m in session_data.get('messages', []):
for tc in m.get('toolCalls', []):
name = tc.get('name')
args_str = json.dumps(tc.get('args', {}))
if name not in extracted_calls:
extracted_calls[name] = ""
extracted_calls[name] += args_str + "\n"
for tool_name, expected_strings in tool_calls_criteria.items():
if tool_name not in extracted_calls:
print(
f"❌ [CHECK FAILED]: Expected tool '{tool_name}' was never called.")
passed = False
continue
tool_args_str = extracted_calls[tool_name]
for expected_str in expected_strings:
if expected_str not in tool_args_str:
print(
f"❌ [CHECK FAILED]: Expected string '{expected_str}' not found in arguments of tool '{tool_name}'."
)
passed = False
except Exception as e:
print(f"❌ [CHECK FAILED]: Failed to parse session JSON: {e}")
passed = False
return passed
def perform_deterministic_checks(success_criteria: dict, workspace_dir: str,
full_stdout: str) -> bool:
'''Evaluates the deterministic checks defined in the persona success_criteria.
Args:
success_criteria: The success_criteria dictionary from the playbook.
workspace_dir: The temporary workspace directory path.
full_stdout: The combined stdout of all CLI invocations.
Returns:
True if all checks pass, False otherwise.
'''
passed = True
if not check_flow_contains(success_criteria.get('flow_contains', []),
full_stdout):
passed = False
if not check_tool_calls_contain(
success_criteria.get('tool_calls_contain', {}), workspace_dir):
passed = False
if not check_files_exist(success_criteria.get('files_exist', []),
workspace_dir):
passed = False
if not check_files_contain(success_criteria.get('files_contain', {}),
workspace_dir):
passed = False
return passed
def _view_file_directory_check(args: dict) -> bool:
"""Predicate to check if the target of view_file is actually a directory."""
path = args.get('AbsolutePath') or args.get('file_path') or args.get('path')
if path:
return os.path.isdir(path)
return False
async def run_turn(agent: Agent, user_input: str) -> None:
"""Sends user input and streams steps in real-time, logging tool calls and errors."""
await agent.conversation.send(user_input)
printed_calls = set()
need_newline = False
at_start_of_line = True
trimmer = StreamingTrimmer()
async for step_obj in agent.conversation.receive_steps():
if step_obj.thinking_delta:
to_print = trimmer.process_delta(step_obj.thinking_delta)
if to_print:
if not need_newline:
print(f" {format_color('🧠 Thinking:', C_GRAY)}", flush=True)
need_newline = True
at_start_of_line = True
parts = to_print.split('\n')
for i, part in enumerate(parts):
if i > 0:
print('\n', end='')
at_start_of_line = True
if part:
if at_start_of_line:
print(' ', end='')
at_start_of_line = False
print(format_color(part, C_GRAY), end='', flush=True)
if step_obj.type == agy_types.StepType.TOOL_CALL:
for tc in step_obj.tool_calls:
if tc.id not in printed_calls:
printed_calls.add(tc.id)
if need_newline:
trimmer.flush_remaining()
print()
need_newline = False
cleaned_args = {
k: v for k, v in tc.args.items() if k not in {
"output",
"results",
"num_results",
"diff_block",
"exit_code",
"combined_output",
"image_name",
}
}
args_str = ", ".join(f"{k}={v}" for k, v in cleaned_args.items())
print(
f" 🛠️ {format_color(f'[Tool Call]: {tc.name}({args_str})', C_GRAY)}"
)
if step_obj.status == agy_types.StepStatus.ERROR:
if need_newline:
trimmer.flush_remaining()
print()
need_newline = False
error_msg = step_obj.error or "Unknown step error"
print(f" ❌ [Error]: {error_msg}")
if need_newline:
trimmer.flush_remaining()
print()
async def run_hybrid_tuning_loop(playbook_path: str, log_dir: str,
skill_src: str = None,
keep_workspace: bool = False,
cli_agent_model: str = None,
cli_evaluator_model: str = None):
'''Executes the test playbook and evaluates the agent's responses.
Args:
playbook_path: The file path to the YAML playbook.
log_dir: The directory where logs and failed JSON dumps should be written.
skill_src: Optional path to a local unpacked skill to run.
keep_workspace: Preserve the temporary workspace directory.
cli_agent_model: Override for the agent model.
cli_evaluator_model: Override for the evaluator model.
Returns:
True if the playbook passes completely, False if any step fails.
'''
evaluator_client = genai.Client()
log_dir = os.path.abspath(log_dir)
os.makedirs(log_dir, exist_ok=True)
with open(playbook_path, 'r') as f:
playbook = yaml.safe_load(f)
validate_playbook(playbook)
playbook_timeout = playbook.get('timeout', 60)
# Determine models (CLI override > Playbook > Default)
agent_model = cli_agent_model or playbook.get('agent_model')
evaluator_model = cli_evaluator_model or playbook.get('evaluator_model',
'gemini-2.5-flash')
playbook_name = playbook.get('name', 'Unknown Playbook')
playbook_steps = playbook.get('steps', [])
persona = playbook.get('persona')
env_context = parse_and_validate_env(playbook)
tmpdir_config = playbook.get('tmpdir')
is_tmpdir = tmpdir_config is not None
original_cwd = os.getcwd()
if is_tmpdir:
workspace_dir = tempfile.mkdtemp(prefix='gemini_harness_')
open(os.path.join(workspace_dir, '.project_root'), 'w').close()
link_paths = tmpdir_config.get('link_paths', [])
for path in link_paths:
src_abs = os.path.abspath(os.path.join(original_cwd, path))
dst_abs = os.path.join(workspace_dir, path)
os.makedirs(os.path.dirname(dst_abs), exist_ok=True)
try:
if os.path.isdir(src_abs):
shutil.copytree(
src_abs, dst_abs,
ignore=shutil.ignore_patterns('.terraform', '.git', '.venv',
'venv', '__pycache__',
'.pytest_cache',
'skill-turn-harness'))
print(f'📁 Copied directory: {path} -> {dst_abs}')
else:
shutil.copy2(src_abs, dst_abs)
print(f'📄 Copied file: {path} -> {dst_abs}')
except Exception as e:
print(f'❌ [SETUP ERROR]: Failed to copy {path}: {e}', file=sys.stderr)
shutil.rmtree(workspace_dir)
sys.exit(1)
os.chdir(workspace_dir)
else:
workspace_dir = original_cwd
print(f'--- Tuning: {playbook_name} | Workspace: {workspace_dir} ---')
interaction_log = []
log_prefix = generate_log_prefix(playbook_path)
md_log_path = os.path.join(log_dir, f'{log_prefix}_log.md')
init_markdown_log(md_log_path, playbook_name)
full_stdout = ""
conversation_history = []
step_index = 0
fallback_to_persona = False
# Configure SDK Agent
skills_paths = []
if skill_src:
skills_paths.append(os.path.abspath(skill_src))
# Allow all tools to emulate CLI -y/--dangerously-skip-permissions
policies = [
policy.deny(
agy_types.BuiltinTools.VIEW_FILE.value,
when=_view_file_directory_check, name=
"Rejection: Path is a directory, not a file. Use list_directory to inspect it."
),
policy.allow_all()
]
standard_instructions = (
"GUIDELINES:\n"
"- Always check if a path is a directory before trying to view it. "
"Use list_directory to inspect directories, never view_file.")
config = LocalAgentConfig(
model=agent_model,
api_key=os.environ.get('GEMINI_API_KEY'),
skills_paths=skills_paths,
policies=policies,
workspaces=[workspace_dir],
save_dir=log_dir, # Use log_dir for raw state too
system_instructions=standard_instructions,
)
try:
async with Agent(config) as agent:
# --- PHASE 1: SCRIPTED STEPS ---
for step_dict in playbook_steps:
raw_user_input = step_dict['user_input']
raw_expected_outcome = step_dict['expected_outcome']
subbed_user_input = string.Template(raw_user_input).safe_substitute(
env_context)
subbed_expected_outcome = string.Template(
raw_expected_outcome).safe_substitute(env_context)
step = StepData(step_index=step_index, user_input=subbed_user_input,
expected_outcome=subbed_expected_outcome)
turn_str = format_color(f'[Step {step.step_index + 1}]', C_BOLD_WHITE)
print(
f"\n{turn_str}\n{format_color('Tester:', C_BLUE)}\n{step.user_input.rstrip()}"
)
try:
await asyncio.wait_for(run_turn(agent, step.user_input),
timeout=playbook_timeout)
step.skill_response = agent.conversation.last_response
except asyncio.TimeoutError:
print(f'⚠️ [TIMEOUT] ({playbook_timeout}s)', file=sys.stderr)
step.skill_response = 'SYSTEM_ERROR: Timeout'
except Exception as e:
print(f'⚠️ [ERROR]: {e}', file=sys.stderr)
step.skill_response = f'SYSTEM_ERROR: {e}'
full_stdout += step.skill_response + "\n"
turn_str = format_color(f'[Step {step.step_index + 1}]', C_BOLD_WHITE)
print(
f"\n{turn_str}\n\n{format_color('Agent:', C_PINK)}\n{step.skill_response.rstrip()}"
)
conversation_history.append({
"user": step.user_input,
"agent": step.skill_response
})
if step.skill_response.startswith('SYSTEM_ERROR'):
label = format_color(f'[FAILURE Step {step.step_index + 1}]', C_GRAY)
msg = format_color('System Error.', C_RED)
print(f'{label}: {msg}')
step.is_system_error = True
log_step_to_markdown(md_log_path, **asdict(step))
interaction_log.append(asdict(step))
dump_failed_log(log_dir, log_prefix, interaction_log)
return False
eval_prompt = f'''
OBJECTIVE: {step.expected_outcome}
ACTUAL RESPONSE: {step.skill_response}
Evaluate if the agent fulfilled the objective.
'''
eval_response = evaluator_client.models.generate_content(
model=evaluator_model,
contents=eval_prompt,
config=types.GenerateContentConfig(
response_mime_type='application/json',
response_schema=EvaluationResult,
temperature=0.0,
),
)
step.parsed_eval = json.loads(eval_response.text)
interaction_log.append(asdict(step))
log_step_to_markdown(md_log_path, **asdict(step))
if not step.parsed_eval['passed']:
if persona:
label = format_color(f'[WARNING Step {step.step_index + 1}]',
C_GRAY)
msg = format_color(step.parsed_eval["reasoning"], C_YELLOW)
print(f'⚠️ {label}: {msg}')
print(
'🔄 Scripted step failed. Falling back to autonomous persona...')
fallback_to_persona = True
break
else:
label = format_color(f'[FAILURE Step {step.step_index + 1}]',
C_GRAY)
msg = format_color(step.parsed_eval["reasoning"], C_RED)
print(f'{label}: {msg}')
dump_failed_log(log_dir, log_prefix, interaction_log)
return False
else:
label = format_color(f'[PASS Step {step.step_index + 1}]', C_GRAY)
msg = format_color(step.parsed_eval["reasoning"], C_GREEN)
print(f'{label}: {msg}')
step_index += 1
# If steps succeeded completely and no persona exists, we're done.
if not persona and not fallback_to_persona:
label = format_color('[SUCCESS]', C_GRAY)
msg = format_color(
f"Scripted Playbook '{playbook_name}' completed successfully.",
C_GREEN)
print(f'\n{label} {msg}')
print(f'📄 Markdown log saved to: {md_log_path}')
return True
# --- PHASE 2: AUTONOMOUS PERSONA ---
print("\n--- Entering Autonomous Persona Mode ---")
persona_context = string.Template(
persona['context']).safe_substitute(env_context)
# Interpolate success criteria string values
success_criteria = persona.get('success_criteria', {})
interpolated_success_criteria = {
'llm_checks': [
string.Template(c).safe_substitute(env_context)
for c in success_criteria.get('llm_checks', [])
],
'flow_contains': [
string.Template(c).safe_substitute(env_context)
for c in success_criteria.get('flow_contains', [])
],
'files_exist': [
string.Template(c).safe_substitute(env_context)
for c in success_criteria.get('files_exist', [])
],
'tool_calls_contain': {
k: [
string.Template(item).safe_substitute(env_context)
for item in v
] for k, v in success_criteria.get('tool_calls_contain',
{}).items()
},
'files_contain': {
string.Template(k).safe_substitute(env_context): [
string.Template(item).safe_substitute(env_context)
for item in v
] for k, v in success_criteria.get('files_contain', {}).items()
}
}
# Determine next input
next_input = None
if len(conversation_history) == 0:
# Pure autonomous start
next_input = string.Template(
persona['initial_user_input']).safe_substitute(env_context)
max_turns = persona.get('max_turns', 10)
if next_input:
print(f"{format_color('Tester:', C_BLUE)}\n{next_input.rstrip()}")
for turn in range(max_turns):
if next_input:
turn_display = len(conversation_history) + 1
turn_str = format_color(f'[Autonomous Turn {turn_display}]',
C_BOLD_WHITE)
print(f"\n{turn_str}")
try:
await asyncio.wait_for(run_turn(agent, next_input),
timeout=playbook_timeout)
agent_response = agent.conversation.last_response
except asyncio.TimeoutError:
print(f'⚠️ [TIMEOUT] ({playbook_timeout}s)', file=sys.stderr)
agent_response = 'SYSTEM_ERROR: Timeout'
except Exception as e:
print(f'⚠️ [ERROR]: {e}', file=sys.stderr)
agent_response = f'SYSTEM_ERROR: {e}'
full_stdout += agent_response + "\n"
print(
f"\n{format_color('Agent:', C_PINK)}\n{agent_response.rstrip()}")
if agent_response.startswith('SYSTEM_ERROR'):
print(f'❌ [FAILURE Turn {turn_display}]: System Error.')
dump_failed_log(log_dir, log_prefix, interaction_log)
return False
conversation_history.append({
"user": next_input,
"agent": agent_response
})
# Log to markdown for autonomous turns (reusing StepData for structure)
step_log = StepData(step_index=turn_display - 1,
user_input=next_input,
expected_outcome="Autonomous Persona Turn",
skill_response=agent_response)
interaction_log.append(asdict(step_log))
with open(md_log_path, 'a') as md_file:
md_file.write(
f'## Autonomous Turn {turn_display}\n\n**User:**\n\n{next_input}\n\n**Agent:**\n\n{agent_response}\n\n---\n\n'
)
# Evaluate state and generate next input
llm_checks = "\n".join([
f"- {check}"
for check in interpolated_success_criteria.get('llm_checks', [])
])
history_text = "\n".join([
f"USER: {h['user']}\nAGENT: {h['agent']}\n"
for h in conversation_history
])
eval_prompt = f"""
You are simulating a user interacting with an AI agent.
YOUR PERSONA AND RULES:
{persona_context}
GOAL (SEMANTIC CHECKS TO VERIFY COMPLETION):
{llm_checks}
CONVERSATION HISTORY:
{history_text}
TASK:
1. Evaluate if the agent's latest response was helpful, followed the rules, and is moving towards the goal. Set `agent_followed_skill_rules` to false if the agent hallucinated or broke rules.
2. Check if the GOAL has been fully achieved based on the conversation so far. If yes, set `test_completed_successfully` to true.
3. If the GOAL is not achieved, formulate the exact text you will reply with to advance the conversation, using only your persona knowledge. Put this in `next_user_input`.
"""
eval_response = evaluator_client.models.generate_content(
model=evaluator_model,
contents=eval_prompt,
config=types.GenerateContentConfig(
response_mime_type='application/json',
response_schema=AutonomousTurnResult,
temperature=0.0,
),
)
parsed_eval = json.loads(eval_response.text)
if not parsed_eval['agent_followed_skill_rules']:
label = format_color('[AUTONOMOUS FAIL]', C_GRAY)
msg = format_color(parsed_eval['reasoning'], C_RED)
print(f"{label}: {msg}")
dump_failed_log(log_dir, log_prefix, interaction_log)
return False
if parsed_eval['test_completed_successfully']:
label = format_color('[AUTONOMOUS SEMANTIC SUCCESS]', C_GRAY)
msg = format_color(parsed_eval['reasoning'], C_GREEN)
print(f"{label}: {msg}")
print("🔍 Performing deterministic checks...")
if perform_deterministic_checks(interpolated_success_criteria,
workspace_dir, full_stdout):
if fallback_to_persona:
label = format_color('[PASS WITH WARNINGS]', C_GRAY)
msg = format_color(
f"Playbook '{playbook_name}' completed via autonomous recovery.",
C_YELLOW)
print(f"\n{label} {msg}")
else:
label = format_color('[SUCCESS]', C_GRAY)
msg = format_color(
f"Autonomous Playbook '{playbook_name}' completed successfully.",
C_GREEN)
print(f"\n{label} {msg}")
print(f'📄 Markdown log saved to: {md_log_path}')
return True
else:
label = format_color('[AUTONOMOUS FAIL]', C_GRAY)
msg = format_color("Deterministic checks failed.", C_RED)
print(f"{label}: {msg}")
dump_failed_log(log_dir, log_prefix, interaction_log)
return False
next_input = parsed_eval['next_user_input']
if next_input:
print(f"\n{format_color('Tester:', C_BLUE)}\n{next_input.rstrip()}")
label = format_color('[AUTONOMOUS FAIL]', C_GRAY)
msg = format_color(
f"Reached max turns ({max_turns}) without completing the goal.",
C_RED)
print(f"{label}: {msg}")
dump_failed_log(log_dir, log_prefix, interaction_log)
return False
except KeyboardInterrupt:
print('\n🛑 [INTERRUPTED] Shutting down cleanly...')
dump_failed_log(log_dir, log_prefix, interaction_log)
return False
finally:
# Locate and copy the session json to the logs directory
# The SDK saves it in save_dir/chats/session-*.json
session_files = glob.glob(os.path.join(log_dir, 'chats', 'session-*.json'))
if session_files:
session_files.sort(key=os.path.getmtime, reverse=True)
session_log_path = os.path.join(log_dir, f'{log_prefix}_session.json')
shutil.copy2(session_files[0], session_log_path)
print(f'📄 Session JSON saved to: {session_log_path}')
if is_tmpdir:
os.chdir(original_cwd)
if not keep_workspace:
shutil.rmtree(workspace_dir)
else:
print(f'📁 Workspace preserved at: {workspace_dir}')
@click.command()
@click.argument('playbook', type=click.Path(exists=True))
@click.option(
'--log-dir',
type=click.Path(),
default='logs',
help='Directory to store logs and json dumps.',
)
@click.option(
'--skill-src',
type=click.Path(exists=True),
default=None,
help='Path to a local unpacked skill directory to run.',
)
@click.option(
'--env-file',
type=click.Path(exists=True),
default=None,
help='Path to a .env file containing secrets to substitute in the playbook.',
)
@click.option(
'--keep-workspace',
is_flag=True,
help='Preserve the temporary workspace directory after execution.',
)
@click.option(
'--agent-model',
type=str,
default=None,
help='Override the model the agent uses (e.g., gemini-2.5-pro).',
)
@click.option(
'--evaluator-model',
type=str,
default=None,
help=
'Override the model the test harness uses to grade (e.g., gemini-2.5-flash).',
)
@click.option(
'--debug',
is_flag=True,
help='Enable debug logging for the SDK.',
)
def main(playbook, log_dir, skill_src, env_file, keep_workspace, agent_model,
evaluator_model, debug):
'''Hybrid Python SDK Test Harness.
Executes a YAML playbook using the Antigravity SDK and evaluates the
responses using the Gemini API.
'''
if env_file:
load_env_file(env_file)
if 'GEMINI_API_KEY' not in os.environ:
print(
'❌ [ERROR]: GEMINI_API_KEY environment variable is not set.',
file=sys.stderr,
)
print(
'Please set it in your environment, provide it in a .env file, or store it in ~/.gemini/key.env',
file=sys.stderr,
)
sys.exit(1)
if debug:
logging.basicConfig(level=logging.DEBUG)
asyncio.run(
run_hybrid_tuning_loop(playbook, log_dir, skill_src, keep_workspace,
agent_model, evaluator_model))
if __name__ == '__main__':
main()