Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 133 additions & 64 deletions agent_go/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@

__all__ = ["run_subtask"]

def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=None, headless=False, issue_ref="", active_pids=None, active_pids_lock=None):
sub_id = subtask["id"]

def _create_worktree(task_id, sub_id, repo, task_dir, logger):
"""Create worktree for a subtask. Returns (worktree_path, create_time_ms)."""
sub_dir = task_dir / sub_id
sub_dir.mkdir(parents=True, exist_ok=True)
worktree = sub_dir / "work"

logger.info(f"─── {sub_id} START: {subtask['title']} ───")
log_event(logger, "subtask_start", {"id": sub_id, "title": subtask["title"],
"depends_on": subtask.get("depends_on", []), "headless": headless, "issue": issue_ref})

clone_start = time.time()
worktree_create_ms = 0
if (worktree / ".git").exists():
logger.info(f"worktree 已存在,跳过创建")
Expand All @@ -44,30 +40,11 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non
worktree.mkdir(parents=True, exist_ok=True)
shutil.copytree(str(repo), str(worktree), dirs_exist_ok=True)

# 产物传递:通过 git merge 将上游代码合并到当前 worktree
# Tag 使用完整路径 task_id/sub_id 避免跨任务冲突
merge_conflicts = {}
merge_results = []
merge_upstream_ms = 0
if upstream_worktrees:
for up_id, up_path in upstream_worktrees.items():
if up_path.exists():
upstream_tag = f"{task_id}/{up_id}"
logger.info(f"产物传递 (git merge): {up_id} → {sub_id} (tag={upstream_tag})")
m_start = time.time()
_git_merge_upstream(up_path, worktree, upstream_tag, logger, headless=headless)
merge_upstream_ms += (time.time() - m_start) * 1000
# 检测上游 merge 是否产生冲突
conflict_file = worktree / ".MERGE_CONFLICT"
has_conflict = conflict_file.exists()
if has_conflict:
merge_conflicts[up_id] = conflict_file.read_text(encoding="utf-8")
conflict_file.unlink()
merge_results.append(collect_merge_result(up_id, not has_conflict,
merge_conflicts.get(up_id, "").split("\n") if has_conflict else None))
clone_time = time.time() - clone_start
return worktree, worktree_create_ms


# 构建 TASK.md:包含完整 Agent Prompt、资源清单、上游上下文
def _build_task_md(subtask, repo, task_dir, worktree, logger, headless, merge_conflicts=None):
"""Build TASK.md content. Returns (task_md, verification, skill_names, unresolved_skills)."""
task_md_parts = [f"# 子任务: {subtask['title']}", ""]

# 注入 git merge 冲突信息(如有)
Expand Down Expand Up @@ -140,7 +117,6 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non
logger.warning(f"Skill 未找到: \"{sn}\",已跳过。已安装: {installed_names[:10]}")

# 将 Agent Prompt 中的源项目路径替换为 worktree 路径,确保隔离
# 边界字符包含: 空白、引号、括号、冒号(含全角)、路径分隔符、中文标点
task_md_text = "\n".join(task_md_parts)
_boundary_chars = r'\s"\'\(\):/:,。、'
_before = rf'(?<![^{_boundary_chars}])'
Expand All @@ -150,36 +126,12 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non
str(worktree),
task_md_text
)
(sub_dir / "TASK.md").write_text(task_md, encoding="utf-8")

# 重写验证命令中的路径
verification = subtask.get("verification", "")
if verification and str(repo) in verification:
_boundary_chars = r'\s"\'\(\):/:,。、'
_before = rf'(?<![^{_boundary_chars}])'
_after = rf'(?![^{_boundary_chars}])'
verification = re.sub(
rf'{_before}{re.escape(str(repo))}{_after}',
str(worktree),
verification
)

print(f"\n🚀 {sub_id}: {subtask['title']}")
env = os.environ.copy()
loaded_skill_names = [sn for sn in skill_names if sn not in unresolved_skills]
env.update({"AGENT_GO_TASK_ID": task_id, "AGENT_GO_SUBTASK_ID": sub_id, "AGENT_GO_WORKTREE": str(worktree), "AGENT_GO_SKILLS": ",".join(loaded_skill_names)})
return task_md, verification, skill_names, unresolved_skills

# ── Agent 类型配置 ──
agent_type_name = subtask.get("agent_type", "developer")
agent = load_agent_type(agent_type_name, repo)
if agent:
env.update(get_agent_env(agent))
logger.info(f"Agent: {agent.type_name}")
else:
from .agents import list_agent_types
available = [a["type"] for a in list_agent_types()]
logger.warning(f"Agent 类型 \"{agent_type_name}\" 未注册,降级为 developer。可用: {available}")

def _run_claude(task_md, worktree, env, headless, agent, sub_id, active_pids, active_pids_lock, logger):
"""Run Claude in headless or interactive mode. Returns (result, sandbox_type, claude_time)."""
claude_start = time.time()

if headless:
Expand Down Expand Up @@ -208,6 +160,12 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non

claude_time = time.time() - claude_start

return result, sandbox_type, claude_time


def _verify_changes(task_id, subtask, worktree, headless, task_md, env, tag_name,
active_pids, active_pids_lock, logger, issue_ref=""):
"""Verify changes, commit if needed, run verification commands. Returns verification dict."""
# 记录变更摘要(使用 git status --porcelain 检测所有变更,包括新文件)
status_result = subprocess.run(["git", "status", "--porcelain"], cwd=str(worktree), capture_output=True, text=True)
has_changes = bool(status_result.stdout.strip())
Expand All @@ -234,10 +192,9 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non

# Git 提交 + tag(Conventional Commits 格式),供下游子任务 merge
# Tag 包含 task_id 前缀,避免跨任务冲突
tag_name = f"{task_id}/{sub_id}"
git_start = time.time()
if has_changes:
commit_msg = _format_commit(subtask['title'], issue_ref, sub_id)
commit_msg = _format_commit(subtask['title'], issue_ref, subtask["id"])
add_result = subprocess.run(["git", "add", "-A"], cwd=str(worktree), capture_output=True)
if add_result.returncode != 0:
logger.warning(f"git add 失败: {add_result.stderr.strip()}")
Expand Down Expand Up @@ -302,10 +259,10 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non
f"错误输出:\n{vr.stderr[-500:]}\n"
"请修复上述问题,确保所有验证命令通过。直接修改文件,不要询问。"
)
_run_headless(fix_prompt, worktree, env, logger, f"{sub_id}-fix", active_pids=active_pids, active_pids_lock=active_pids_lock)
_run_headless(fix_prompt, worktree, env, logger, f"{subtask['id']}-fix", active_pids=active_pids, active_pids_lock=active_pids_lock)
subprocess.run(["git", "add", "-A"], cwd=str(worktree), capture_output=True)
subprocess.run(["git", "commit", "-m",
f"{sub_id} (fix): 验证修复"], cwd=str(worktree),
f"{subtask['id']} (fix): 验证修复"], cwd=str(worktree),
capture_output=True)
subprocess.run(["git", "tag", "-f", tag_name], cwd=str(worktree),
capture_output=True)
Expand Down Expand Up @@ -345,7 +302,21 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non
else:
logger.info(f"验证 [{vi+1}/{len(cmds)}] 通过")

# 生成共享上下文(供下游子任务使用)
return {
"has_changes": has_changes,
"summary": summary,
"metrics_changes": metrics_changes,
"git_commit_ms": git_commit_ms,
"verification_ms": verification_ms,
"verification": verification,
"verify_ok": verify_ok,
"retry_count": retry_count,
"verification_results": verification_results,
}


def _generate_context(subtask, task_dir, sub_id, logger, headless, result, verify_ok, summary, verification):
"""Generate shared context file for downstream subtasks. Writes to context.md."""
ctx_parts = [
f"### {sub_id}: {subtask['title']}",
f"- 状态: {'通过' if verify_ok else '需关注'}",
Expand All @@ -364,11 +335,109 @@ def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=Non
ctx_parts.append("")
# 线程安全地追加共享上下文
# 写入独立上下文文件(仅被直接下游子任务读取)
ctx_file = sub_dir / "context.md"
ctx_file = task_dir / sub_id / "context.md"
ctx_file.write_text("\n".join(ctx_parts) + "\n", encoding="utf-8")
line_count = len("\n".join(ctx_parts).splitlines())
logger.info(f"上下文已写入: {line_count} 行")


def run_subtask(task_id, subtask, repo, task_dir, logger, upstream_worktrees=None, headless=False, issue_ref="", active_pids=None, active_pids_lock=None):
sub_id = subtask["id"]
sub_dir = task_dir / sub_id
sub_dir.mkdir(parents=True, exist_ok=True)

logger.info(f"─── {sub_id} START: {subtask['title']} ───")
log_event(logger, "subtask_start", {"id": sub_id, "title": subtask["title"],
"depends_on": subtask.get("depends_on", []), "headless": headless, "issue": issue_ref})

clone_start = time.time()

# 1. Create worktree
worktree, worktree_create_ms = _create_worktree(task_id, sub_id, repo, task_dir, logger)

# 2. Upstream merge (artifact passing)
merge_conflicts = {}
merge_results = []
merge_upstream_ms = 0
if upstream_worktrees:
for up_id, up_path in upstream_worktrees.items():
if up_path.exists():
upstream_tag = f"{task_id}/{up_id}"
logger.info(f"产物传递 (git merge): {up_id} → {sub_id} (tag={upstream_tag})")
m_start = time.time()
_git_merge_upstream(up_path, worktree, upstream_tag, logger, headless=headless)
merge_upstream_ms += (time.time() - m_start) * 1000
# 检测上游 merge 是否产生冲突
conflict_file = worktree / ".MERGE_CONFLICT"
has_conflict = conflict_file.exists()
if has_conflict:
merge_conflicts[up_id] = conflict_file.read_text(encoding="utf-8")
conflict_file.unlink()
merge_results.append(collect_merge_result(up_id, not has_conflict,
merge_conflicts.get(up_id, "").split("\n") if has_conflict else None))
clone_time = time.time() - clone_start

# 3. Build TASK.md
task_md, verification, skill_names, unresolved_skills = _build_task_md(
subtask, repo, task_dir, worktree, logger, headless,
merge_conflicts=merge_conflicts
)

# Write TASK.md to disk
(sub_dir / "TASK.md").write_text(task_md, encoding="utf-8")

# Save original verification before path rewriting (for context.md)
original_verification = verification
# Rewrite verification command paths
if verification and str(repo) in verification:
_boundary_chars = r'\s"\'\(\):/:,。、'
_before = rf'(?<![^{_boundary_chars}])'
_after = rf'(?![^{_boundary_chars}])'
verification = re.sub(
rf'{_before}{re.escape(str(repo))}{_after}',
str(worktree),
verification
)

print(f"\n🚀 {sub_id}: {subtask['title']}")
env = os.environ.copy()
loaded_skill_names = [sn for sn in skill_names if sn not in unresolved_skills]
env.update({"AGENT_GO_TASK_ID": task_id, "AGENT_GO_SUBTASK_ID": sub_id, "AGENT_GO_WORKTREE": str(worktree), "AGENT_GO_SKILLS": ",".join(loaded_skill_names)})

# 4. Agent type configuration
agent_type_name = subtask.get("agent_type", "developer")
agent = load_agent_type(agent_type_name, repo)
if agent:
env.update(get_agent_env(agent))
logger.info(f"Agent: {agent.type_name}")
else:
from .agents import list_agent_types
available = [a["type"] for a in list_agent_types()]
logger.warning(f"Agent 类型 \"{agent_type_name}\" 未注册,降级为 developer。可用: {available}")

# 5. Run Claude
result, sandbox_type, claude_time = _run_claude(
task_md, worktree, env, headless, agent, sub_id, active_pids, active_pids_lock, logger
)

# 6. Verify changes
tag_name = f"{task_id}/{sub_id}"
verify_results = _verify_changes(
task_id, subtask, worktree, headless, task_md, env, tag_name,
active_pids, active_pids_lock, logger, issue_ref=issue_ref
)
has_changes = verify_results["has_changes"]
summary = verify_results["summary"]
metrics_changes = verify_results["metrics_changes"]
git_commit_ms = verify_results["git_commit_ms"]
verification_ms = verify_results["verification_ms"]
verify_ok = verify_results["verify_ok"]
retry_count = verify_results["retry_count"]
verification_results = verify_results["verification_results"]

# 7. Generate context (use original verification, not path-rewritten)
_generate_context(subtask, task_dir, sub_id, logger, headless, result, verify_ok, summary, original_verification)

# 状态判定: completed(有变更) / no_changes(完成但无变更) / failed(异常)
if result.returncode == 0 and verify_ok:
status = "no_changes" if summary == "无文件变更" else "completed"
Expand Down
Loading