From f4bae3ed8ec8968f93c3d64461bd0f7fa282b29d Mon Sep 17 00:00:00 2001 From: Soxoj Date: Sun, 17 May 2026 20:08:09 +0200 Subject: [PATCH] Hacker-style CLI, parallel clones, fork-filter transparency --- gitcolombo.py | 413 +++++++++++++++++++++++++++++++++++++++------ test_gitcolombo.py | 395 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 749 insertions(+), 59 deletions(-) diff --git a/gitcolombo.py b/gitcolombo.py index 36e6592..9afdab9 100755 --- a/gitcolombo.py +++ b/gitcolombo.py @@ -14,6 +14,9 @@ import os import re import subprocess +import sys +import threading +import time import urllib.error import urllib.request from collections import defaultdict @@ -39,6 +42,7 @@ HTTP_TIMEOUT = 15 HTTP_USER_AGENT = "gitcolombo/0.2" RESOLVE_WORKERS = 8 +CLONE_WORKERS = 8 DEFAULT_REPOS_DIR = "repos" GITHUB_GPG_KEYS_URL = "https://api.github.com/users/{nickname}/gpg_keys" @@ -75,6 +79,74 @@ def is_system_email(email): return bool(email and SYSTEM_EMAIL_RE.search(email)) +# ---------- Terminal styling ---------- + +# ANSI 256-color palette, picked to mirror the web UI's green-on-black look. +NEON = "\033[38;5;46m" # primary bright green +LIME = "\033[38;5;82m" # highlight (slightly lighter) +GREEN_DIM = "\033[38;5;34m" # secondary green +GREY = "\033[38;5;240m" # faint borders / dot-leaders +RED = "\033[38;5;196m" # warnings / noreply tags +BOLD = "\033[1m" +RESET = "\033[0m" + +BANNER = r""" + ░██████╗░██╗████████╗░█████╗░░█████╗░██╗░░░░░░█████╗░███╗░░░███╗██████╗░░█████╗░ + ██╔════╝░██║╚══██╔══╝██╔══██╗██╔══██╗██║░░░░░██╔══██╗████╗░████║██╔══██╗██╔══██╗ + ██║░░██╗░██║░░░██║░░░██║░░╚═╝██║░░██║██║░░░░░██║░░██║██╔████╔██║██████╦╝██║░░██║ + ██║░░╚██╗██║░░░██║░░░██║░░██╗██║░░██║██║░░░░░██║░░██║██║╚██╔╝██║██╔══██╗██║░░██║ + ╚██████╔╝██║░░░██║░░░╚█████╔╝╚█████╔╝███████╗╚█████╔╝██║░╚═╝░██║██████╦╝╚█████╔╝ + ░╚═════╝░╚═╝░░░╚═╝░░░░╚════╝░░╚════╝░╚══════╝░╚════╝░╚═╝░░░░░╚═╝╚═════╝░░╚════╝░ + :: git commit osint :: +""" + +_COLOR_ENABLED = False +RULE_WIDTH = 80 + + +def _setup_colors(force_off: bool) -> None: + global _COLOR_ENABLED + if force_off or os.environ.get("NO_COLOR"): + _COLOR_ENABLED = False + return + try: + _COLOR_ENABLED = sys.stdout.isatty() + except Exception: + _COLOR_ENABLED = False + + +def _c(code: str, text: str) -> str: + return f"{code}{text}{RESET}" if _COLOR_ENABLED else text + + +def _rule(width: int = RULE_WIDTH) -> str: + return _c(GREY, "─" * width) + + +def _tag(text: str, color: str = GREEN_DIM) -> str: + return _c(color, f"[{text}]") + + +def _email_with_tag(email: str) -> str: + """Bare email, with a trailing [noreply] tag if it's a service address.""" + out = _c(NEON, email) + if is_system_email(email): + out += " " + _tag("noreply", RED) + return out + + +def _email_brackets(email: str) -> str: + """ [noreply]? — tag stays outside the angle brackets.""" + out = _c(GREEN_DIM, "<") + _c(NEON, email) + _c(GREEN_DIM, ">") + if is_system_email(email): + out += " " + _tag("noreply", RED) + return out + + +def _section(title: str) -> list[str]: + return ["", _rule(), _c(GREEN_DIM, f"[ {title} ]"), _rule(), ""] + + # ---------- HTTP helpers ---------- def _http_get(url: str) -> bytes | None: @@ -132,10 +204,18 @@ def get_public_repos_count(nickname: str) -> int: def get_github_repos( nickname: str, repos_count: int, include_forks: bool = False, ) -> set[str]: + """Return URLs of *nickname*'s repos. Forks dropped unless include_forks. + + Logs a per-call summary (seen / forks-skipped / failed-pages) at INFO so + the caller can explain a "245 found → only 31 cloned" gap. + """ if repos_count <= 0: return set() last_page = (repos_count + GITHUB_PER_PAGE - 1) // GITHUB_PER_PAGE repos: set[str] = set() + seen = 0 + forks_skipped = 0 + failed_pages = 0 for page in range(1, last_page + 1): data = _http_get_json( GITHUB_REPOS_URL.format( @@ -143,10 +223,26 @@ def get_github_repos( ) ) if not data: + failed_pages += 1 + logger.warning( + "repos listing page %d/%d returned no data (rate limit? " + "try GITHUB_TOKEN env var)", page, last_page, + ) continue for repo in data: - if include_forks or not repo.get("fork"): - repos.add(repo["html_url"]) + seen += 1 + if repo.get("fork") and not include_forks: + forks_skipped += 1 + continue + repos.add(repo["html_url"]) + logger.info( + "listing: %d seen, %d forks %s, %d kept%s", + seen, + forks_skipped, + "kept" if include_forks else "skipped", + len(repos), + f", {failed_pages} page(s) failed" if failed_pages else "", + ) return repos @@ -211,13 +307,21 @@ def print_gpg_results(results, ignore_noreply: bool = True) -> bool: ] if not rows: return False - print("PGP key UIDs (uploaded by the user, public via /users/{u}/gpg_keys):") - print(DELIMITER) + for line in _section("pgp key uids"): + print(line) + print(" " + _c(GREEN_DIM, "source: /users/{u}/gpg_keys (user-uploaded)")) + print() rows.sort(key=lambda r: (not r["verified"], r["email"])) for r in rows: - flag = "verified" if r["verified"] else "unverified" - print(" {:40} [{}] key_id={} ({})".format( - r["email"], flag, r["key_id"] or "?", r["source"], + flag_color = LIME if r["verified"] else GREEN_DIM + flag = _tag("verified" if r["verified"] else "unverified", flag_color) + print(" {arrow} {email:40} {flag} {kid}={key} {src}".format( + arrow=_c(LIME, "▶"), + email=_email_with_tag(r["email"]), + flag=flag, + kid=_c(GREEN_DIM, "key_id"), + key=_c(NEON, r["key_id"] or "?"), + src=_tag(r["source"]), )) print() return True @@ -292,22 +396,33 @@ def print_search_results(results, ignore_noreply: bool = True) -> None: groups.setdefault(key, []).append(r) if not groups: - print("No public commits found via /search/commits.") + print(_c(RED, "[!] no public commits found via /search/commits")) return - print(f"Found {len(groups)} unique (email, name) identities:") - print(DELIMITER) + for line in _section("commit search"): + print(line) + print(" " + _c(GREEN_DIM, "identities found: ") + _c(NEON, str(len(groups)))) + print() ordered = sorted(groups.items(), key=lambda kv: -len(kv[1])) for (email, name), rows in ordered: repos = sorted({r["repo"] for r in rows if r["repo"]}) roles = sorted({r["role"] for r in rows}) - print("{} <{}> x{} [{}]".format( - name or "", email, len(rows), ", ".join(roles), + print(" {arrow} {name} {brackets} {hits} {roles}".format( + arrow=_c(LIME, "▶"), + name=_c(BOLD + NEON, name or "?"), + brackets=_email_brackets(email), + hits=_c(LIME, f"×{len(rows)}"), + roles=_tag(", ".join(roles)), )) - for repo in repos[:5]: - print(f" repo: {repo}") + for i, repo in enumerate(repos[:5]): + last = i == min(4, len(repos) - 1) and len(repos) <= 5 + branch = "└─" if last else "├─" + print(" " + _c(GREEN_DIM, branch) + " " + + _c(GREEN_DIM, "repo ") + _c(NEON, repo)) if len(repos) > 5: - print(f" ... +{len(repos) - 5} more repos") + print(" " + _c(GREEN_DIM, "└─ ") + + _c(GREEN_DIM, f"... +{len(repos) - 5} more repos")) + print() # ---------- Filesystem helpers ---------- @@ -361,6 +476,106 @@ def git_clone(url: str, dest_dir: str) -> str | None: return target +def _short_url(url: str, width: int = 50) -> str: + """Trim URL for progress display: keep owner/repo tail.""" + if len(url) <= width: + return url + tail = "/".join(url.rstrip("/").split("/")[-2:]) + return ("…" + tail)[-width:] + + +def clone_many( + urls: list[str], + dest_dir: str, + workers: int = CLONE_WORKERS, +) -> dict[str, str | None]: + """Clone *urls* concurrently. Returns {url: local_path or None}. + + Prints a live progress line to stderr (overwritten on TTY, line-per-tick + otherwise) so the user can see what's happening during long clone batches. + """ + total = len(urls) + if total == 0: + return {} + + results: dict[str, str | None] = {} + state = {"done": 0, "ok": 0, "fail": 0, "current": ""} + lock = threading.Lock() + started = time.monotonic() + is_tty = False + try: + is_tty = sys.stderr.isatty() + except Exception: + pass + + last_done = {"value": -1} + + def render(final: bool = False) -> None: + elapsed = time.monotonic() - started + fail_chunk = _c(RED, f"fail={state['fail']}") if state["fail"] else \ + _c(GREEN_DIM, "fail=0") + line = ( + _c(GREEN_DIM, "[*] ") + + _c(LIME, "cloning ") + + _c(NEON, f"{state['done']}/{total}") + + " " + _c(GREEN_DIM, f"ok={state['ok']}") + + " " + fail_chunk + + " " + _c(GREEN_DIM, f"{elapsed:>4.0f}s") + ) + if state["current"] and not final: + line += " " + _c(GREEN_DIM, "· ") + _c(NEON, state["current"]) + if is_tty: + # \r + clear-to-end-of-line keeps the progress on a single line. + sys.stderr.write("\r\033[K" + line) + if final: + sys.stderr.write("\n") + sys.stderr.flush() + else: + # Non-TTY: avoid a flood of identical "0/N" lines while threads + # pick up their first job. Only emit when the done counter ticks + # forward (or on the final summary). + if final or state["done"] != last_done["value"]: + last_done["value"] = state["done"] + sys.stderr.write(line + "\n") + + def worker(url: str) -> None: + with lock: + state["current"] = _short_url(url) + render() + path = git_clone(url, dest_dir) + with lock: + state["done"] += 1 + if path: + state["ok"] += 1 + else: + state["fail"] += 1 + # Don't keep stale "current" once this thread is done; the next + # worker that picks up a job will overwrite it. + state["current"] = "" + render() + + render() # initial 0/total + try: + with ThreadPoolExecutor(max_workers=max(1, workers)) as pool: + futures = {pool.submit(worker, url): url for url in urls} + for fut in futures: + try: + fut.result() + except Exception as exc: # pragma: no cover - defensive + logger.debug("clone worker for %s raised: %s", futures[fut], exc) + finally: + with lock: + state["current"] = "" + render(final=True) + + # Map each URL to its deterministic target path so callers get a stable + # {url: path|None} contract regardless of completion order. + for url in urls: + target = os.path.join(dest_dir, _clone_target_dir(url)) + results[url] = target if os.path.isdir(os.path.join(target, ".git")) else None + return results + + # ---------- Data classes ---------- def _split_name_email(raw: str) -> tuple[str, str]: @@ -422,21 +637,30 @@ class Person: last_commit_hash: str | None = None def __str__(self) -> str: - lines = [ - f"Name:\t\t\t{self.name}", - f"Email:\t\t\t{self.email}", - ] + # Headline: ▶ name [noreply]? + header = " {arrow} {name} {brackets}".format( + arrow=_c(LIME, "▶"), + name=_c(BOLD + NEON, self.name or "?"), + brackets=_email_brackets(self.email), + ) + rows: list[tuple[str, str]] = [] if self.as_author: - lines.append(f"Appears as author:\t{self.as_author} times") + rows.append(("author", _c(LIME, f"×{self.as_author}"))) if self.as_committer: - lines.append(f"Appears as committer:\t{self.as_committer} times") + rows.append(("committer", _c(LIME, f"×{self.as_committer}"))) if self.github_login: + url = f"https://github.com/{self.github_login}" + rows.append(("github", _c(LIME, url) + " " + _tag("verified", LIME))) + for alias in self.also_known.values(): + alias_text = f"{alias.name} {_email_brackets(alias.email)}" + rows.append(("alias", alias_text)) + + lines = [header] + for i, (label, value) in enumerate(rows): + branch = "└─" if i == len(rows) - 1 else "├─" lines.append( - f"Verified account:\n\t\t\thttps://github.com/{self.github_login}" - ) - if self.also_known: - lines.append( - "Also appears with:" + "".join(f"\n\t\t\t{k}" for k in self.also_known) + " " + _c(GREEN_DIM, branch) + " " + + _c(GREEN_DIM, f"{label:<10}") + " " + value ) return "\n".join(lines) @@ -452,8 +676,10 @@ def __init__(self, repos_dir: str = DEFAULT_REPOS_DIR) -> None: self.repos: list[str] = [] self.same_emails_persons: dict[str, tuple[list[str], set[str]]] = {} - def append(self, source: str) -> None: - if "://" in source: + def append(self, source: str, *, cloned_path: str | None = None) -> None: + if cloned_path is not None: + repo_dir = cloned_path + elif "://" in source: repo_dir = git_clone(source, self.repos_dir) if repo_dir is None: return @@ -537,34 +763,67 @@ def _analyze(self, new_commits: Iterable[Commit], repo_url: str) -> None: } def __str__(self) -> str: - parts: list[str] = [ - f'Analyze of the git repo(s) "{", ".join(self.repos)}"', - "", - "Verbose persons info:", - ] - for _, person in self.sorted_persons: - parts.append(DELIMITER) - parts.append(str(person)) + parts: list[str] = [] + + # 1. Stats — top-level summary of what was scanned and what was found. + parts.extend(_section("stats")) + for label, value in ( + ("repos", len(self.repos)), + ("commits", len(self.commits)), + ("persons", len(self.persons)), + ): + dots = "." * (16 - len(label)) + parts.append(" " + _c(GREEN_DIM, label) + " " + + _c(GREY, dots) + " " + _c(NEON, str(value))) + parts.append("") + parts.append(" " + _c(GREEN_DIM, "targets")) + for i, repo in enumerate(self.repos): + branch = "└─" if i == len(self.repos) - 1 else "├─" + parts.append(" " + _c(GREEN_DIM, branch) + " " + _c(NEON, repo)) + # 2. Correlation — shared names with multiple emails + same-person clusters. matching: list[str] = [] for name, emails in self.name_to_emails.items(): - if len(emails) > 1: - emails_block = "\n\t\t\t".join(sorted(emails)) - matching.append( - f"\n{name} is the owner of emails:\n\t\t\t{emails_block}" + if len(emails) <= 1: + continue + sorted_emails = sorted(emails) + block = [ + " {bang} {name} {arrow} {n} emails".format( + bang=_c(RED, "[!]"), + name=_c(BOLD + NEON, name), + arrow=_c(GREEN_DIM, "→"), + n=_c(LIME, str(len(sorted_emails))), ) - if matching: - parts.append("") - parts.append("Matching info:") - parts.append(DELIMITER + "".join(matching)) + ] + for i, e in enumerate(sorted_emails): + branch = "└─" if i == len(sorted_emails) - 1 else "├─" + block.append(" " + _c(GREEN_DIM, branch) + " " + + _email_with_tag(e)) + matching.append("\n".join(block)) + + same_person: list[str] = [] + for names, _emails in self.same_emails_persons.values(): + joined = _c(BOLD + NEON, (" " + _c(GREEN_DIM, "≡") + " ").join(names)) + same_person.append( + " " + _c(RED, "[!]") + " " + _c(GREEN_DIM, "same person:") + " " + + joined + ) - for names, _ in self.same_emails_persons.values(): - parts.append(f"\n{' and '.join(names)} are the same person") + if matching or same_person: + parts.extend(_section("correlation")) + if matching: + parts.append("\n\n".join(matching)) + parts.append("") + if same_person: + parts.extend(same_person) + parts.append("") + + # 3. Identities — per-person breakdown. + parts.extend(_section("identities")) + for _, person in self.sorted_persons: + parts.append(str(person)) + parts.append("") - parts.append("") - parts.append("Statistics info:") - parts.append(DELIMITER) - parts.append(f"Total persons: {len(self.persons)}") return "\n".join(parts) @@ -601,7 +860,20 @@ def _parse_args() -> argparse.Namespace: "--repos-dir", default=DEFAULT_REPOS_DIR, help=f"directory to clone remote repositories into (default: {DEFAULT_REPOS_DIR})", ) + parser.add_argument( + "--clone-workers", type=int, default=CLONE_WORKERS, + help=f"parallel git-clone workers (default: {CLONE_WORKERS})", + ) + parser.add_argument( + "--include-forks", action="store_true", + help="include forked repositories (default: skipped — forks add upstream " + "history that is not the target user's work)", + ) parser.add_argument("--debug", action="store_true", help="print debug information") + parser.add_argument( + "--no-color", action="store_true", + help="disable ANSI colors (also honored via NO_COLOR env var or non-TTY stdout)", + ) return parser.parse_args() @@ -616,18 +888,24 @@ def _collect_sources(args: argparse.Namespace) -> list[str]: if args.nickname: count = get_public_repos_count(args.nickname) if count: - print(f"found {count} repos") - sources.extend(get_github_repos(args.nickname, repos_count=count)) + logger.info("found %d public repos for %s", count, args.nickname) + sources.extend(get_github_repos( + args.nickname, repos_count=count, + include_forks=args.include_forks, + )) return sources def main() -> None: args = _parse_args() + _setup_colors(force_off=args.no_color) logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, - format="-" * 40 + "\n%(levelname)s: %(message)s", + format=_c(GREEN_DIM, "[*] ") + _c(LIME, "%(levelname)s") + " %(message)s", ) + print(_c(NEON, BANNER), flush=True) + if args.search: token = os.environ.get("GITHUB_TOKEN") ignore = not args.no_ignore_noreply @@ -645,10 +923,33 @@ def main() -> None: return analyst = GitAnalyst(repos_dir=args.repos_dir) - for source in sources: - analyst.append(source) - logger.info("Resolving GitHub usernames, please wait...") + url_sources = [s for s in sources if "://" in s] + local_sources = [s for s in sources if "://" not in s] + + cloned: dict[str, str | None] = {} + if url_sources: + logger.info( + "cloning %d repo(s) into %s with %d workers", + len(url_sources), args.repos_dir, args.clone_workers, + ) + cloned = clone_many(url_sources, args.repos_dir, workers=args.clone_workers) + failed = [u for u, p in cloned.items() if p is None] + if failed: + logger.warning("%d clone(s) failed (see --debug for reasons)", len(failed)) + + to_analyze = len(local_sources) + sum(1 for p in cloned.values() if p) + if to_analyze: + logger.info("analyzing %d repo(s)...", to_analyze) + for src in local_sources: + analyst.append(src) + for url, path in cloned.items(): + if path: + analyst.append(url, cloned_path=path) + + if analyst.persons: + logger.info("resolving GitHub usernames for %d identities...", + len(analyst.persons)) analyst.resolve_persons() if analyst.repos: diff --git a/test_gitcolombo.py b/test_gitcolombo.py index 3ddc696..000572e 100644 --- a/test_gitcolombo.py +++ b/test_gitcolombo.py @@ -311,7 +311,11 @@ def test_author_committer_link(self): class TestCollectSources(unittest.TestCase): def _args(self, **kw): - defaults = dict(url=None, dir=None, recursive=False, nickname=None, github=False, debug=False) + defaults = dict( + url=None, dir=None, recursive=False, nickname=None, + github=False, debug=False, include_forks=False, + clone_workers=gc.CLONE_WORKERS, no_color=False, + ) defaults.update(kw) return types.SimpleNamespace(**defaults) @@ -450,11 +454,396 @@ def test_str_render_contains_summary(self): analyst = gc.GitAnalyst() analyst.append(self.tmp) rendered = str(analyst) - self.assertIn("Verbose persons info", rendered) - self.assertIn("Total persons: 2", rendered) + self.assertIn("[ identities ]", rendered) + self.assertIn("[ stats ]", rendered) + self.assertRegex(rendered, r"persons\s+\.+\s+2") self.assertIn("alice@example.com", rendered) self.assertIn("bob@example.com", rendered) +# ---------- Console styling helpers ---------- + +class TestStylingHelpers(unittest.TestCase): + """Covers _setup_colors / _c / _email_with_tag / _email_brackets / _section. + + Color state is module-level; save and restore it around each test so + one test cannot leak ANSI codes into another's assertions. + """ + + def setUp(self): + self._prev = gc._COLOR_ENABLED + gc._COLOR_ENABLED = False + + def tearDown(self): + gc._COLOR_ENABLED = self._prev + + def test_c_returns_plain_when_colors_disabled(self): + # Defensive: even when called with an ANSI code, plain text is emitted + # when colors are off — otherwise piping to a file dumps escape codes. + self.assertEqual(gc._c(gc.NEON, "hello"), "hello") + self.assertNotIn("\033", gc._c(gc.NEON, "hello")) + + def test_c_wraps_with_reset_when_colors_enabled(self): + gc._COLOR_ENABLED = True + out = gc._c(gc.NEON, "hello") + self.assertTrue(out.startswith(gc.NEON)) + self.assertTrue(out.endswith(gc.RESET)) + self.assertIn("hello", out) + + def test_setup_colors_force_off_overrides_tty(self): + gc._setup_colors(force_off=True) + self.assertFalse(gc._COLOR_ENABLED) + + def test_setup_colors_respects_no_color_env(self): + with mock.patch.dict(os.environ, {"NO_COLOR": "1"}): + gc._setup_colors(force_off=False) + self.assertFalse(gc._COLOR_ENABLED) + + def test_setup_colors_off_for_non_tty(self): + # Plain stdout (e.g. captured by unittest) is not a TTY → no colors. + with mock.patch.dict(os.environ, {}, clear=True): + gc._setup_colors(force_off=False) + self.assertFalse(gc._COLOR_ENABLED) + + def test_email_with_tag_marks_noreply(self): + out = gc._email_with_tag("noreply@github.com") + self.assertIn("noreply@github.com", out) + self.assertIn("[noreply]", out) + + def test_email_with_tag_skips_normal_email(self): + self.assertEqual(gc._email_with_tag("alice@example.com"), + "alice@example.com") + + def test_email_brackets_keeps_tag_outside_angle_brackets(self): + # Regression: an earlier version stuck [noreply] INSIDE the <...> + # which produced things like ``. + out = gc._email_brackets("noreply@github.com") + self.assertTrue(out.startswith("<")) + bracket_close = out.index(">") + self.assertLess(bracket_close, out.index("[noreply]")) + + def test_email_brackets_plain_email_has_no_tag(self): + self.assertEqual(gc._email_brackets("alice@example.com"), + "") + + def test_section_returns_header_with_two_rules(self): + block = gc._section("identities") + # _section yields: blank, rule, "[ identities ]", rule, blank + self.assertEqual(len(block), 5) + self.assertIn("[ identities ]", block[2]) + # The two rule lines on either side of the title are identical. + self.assertEqual(block[1], block[3]) + + +# ---------- Person.__str__ rendering ---------- + +class TestPersonRender(unittest.TestCase): + def setUp(self): + self._prev = gc._COLOR_ENABLED + gc._COLOR_ENABLED = False + + def tearDown(self): + gc._COLOR_ENABLED = self._prev + + def test_header_uses_arrow_and_brackets(self): + p = gc.Person(key="Alice a@x.io", name="Alice", email="a@x.io", + as_author=3) + out = str(p) + first = out.splitlines()[0] + self.assertIn("▶", first) + self.assertIn("Alice", first) + self.assertIn("", first) + + def test_counts_use_times_n_notation(self): + p = gc.Person(key="A a@x", name="A", email="a@x", + as_author=11, as_committer=7) + out = str(p) + self.assertIn("×11", out) + self.assertIn("×7", out) + + def test_last_row_uses_l_branch(self): + # Single row → final branch char `└─`, not `├─`. + p = gc.Person(key="A a@x", name="A", email="a@x", as_author=1) + rows = [line for line in str(p).splitlines() if "author" in line] + self.assertEqual(len(rows), 1) + self.assertIn("└─", rows[0]) + + def test_two_rows_use_tee_then_l(self): + p = gc.Person(key="A a@x", name="A", email="a@x", + as_author=1, as_committer=1) + body = str(p).splitlines()[1:] + self.assertIn("├─", body[0]) + self.assertIn("└─", body[-1]) + + def test_noreply_tag_on_alias_email_outside_brackets(self): + # Alias whose email is a noreply service address should render + # ` [noreply]` — tag after the closing `>`. + alias = gc.Person(key="b", name="GitHub", email="noreply@github.com") + p = gc.Person(key="a", name="Alice", email="a@x.io", as_author=1) + p.also_known[alias.key] = alias + out = str(p) + # The alias row must contain both `>` and `[noreply]` with the tag + # appearing after the closing angle bracket. + alias_line = next(l for l in out.splitlines() if "GitHub" in l) + self.assertLess(alias_line.index(">"), alias_line.index("[noreply]")) + + def test_verified_tag_appears_for_resolved_login(self): + p = gc.Person(key="A a@x", name="A", email="a@x", as_author=1, + github_login="alice42") + out = str(p) + self.assertIn("https://github.com/alice42", out) + self.assertIn("[verified]", out) + + +# ---------- GitAnalyst.__str__ rendering ---------- + +class TestAnalystRender(unittest.TestCase): + def setUp(self): + self._prev = gc._COLOR_ENABLED + gc._COLOR_ENABLED = False + + def tearDown(self): + gc._COLOR_ENABLED = self._prev + + def _commit(self, h, a_name, a_email, c_name=None, c_email=None): + c_name = c_name or a_name + c_email = c_email or a_email + line = f'{h};"{a_name} {a_email}";"{c_name} {c_email}"' + return gc.Commit.parse(line) + + def _filled(self): + analyst = gc.GitAnalyst() + analyst.repos = ["/tmp/r1", "/tmp/r2"] + analyst.commits = [self._commit("h1", "Alice", "a@x.io"), + self._commit("h2", "Bob", "b@x.io")] + analyst._analyze(analyst.commits, "/tmp/r1") + return analyst + + def test_section_ordering_stats_first(self): + # User-facing requirement: stats above correlation above identities. + a = self._filled() + # Force a correlation block to exist via shared email set. + a._analyze( + [self._commit("h3", "Alice", "shared@x.io"), + self._commit("h4", "AliceB", "shared@x.io")], + "/tmp/r1", + ) + out = str(a) + i_stats = out.index("[ stats ]") + i_corr = out.index("[ correlation ]") + i_idents = out.index("[ identities ]") + self.assertLess(i_stats, i_corr) + self.assertLess(i_corr, i_idents) + + def test_stats_uses_dot_leaders(self): + out = str(self._filled()) + # `persons ........ N` with dots between label and value. + self.assertRegex(out, r"persons\s+\.+\s+\d+") + self.assertRegex(out, r"commits\s+\.+\s+\d+") + self.assertRegex(out, r"repos\s+\.+\s+\d+") + + def test_targets_listed_as_tree(self): + out = str(self._filled()) + # The last target uses `└─`, earlier ones `├─`. + target_lines = [l for l in out.splitlines() if "/tmp/r" in l] + self.assertEqual(len(target_lines), 2) + self.assertIn("├─", target_lines[0]) + self.assertIn("└─", target_lines[1]) + + def test_correlation_absent_when_no_overlap(self): + # Pure disjoint identities → no correlation section. + out = str(self._filled()) + self.assertNotIn("[ correlation ]", out) + + def test_correlation_present_for_shared_name_two_emails(self): + analyst = gc.GitAnalyst() + analyst.repos = ["/tmp/r"] + commits = [ + self._commit("h1", "Alice", "a1@x.io"), + self._commit("h2", "Alice", "a2@x.io"), + ] + analyst.commits = commits + analyst._analyze(commits, "/tmp/r") + out = str(analyst) + self.assertIn("[ correlation ]", out) + # The N→emails summary uses `→` and the count. + self.assertRegex(out, r"Alice\s*→\s*2 emails") + + def test_correlation_lists_same_person_clusters(self): + analyst = gc.GitAnalyst() + analyst.repos = ["/tmp/r"] + commits = [ + self._commit("h1", "Alice", "shared@x.io"), + self._commit("h2", "AliceB", "shared@x.io"), + ] + analyst.commits = commits + analyst._analyze(commits, "/tmp/r") + out = str(analyst) + self.assertIn("same person", out) + self.assertIn("Alice", out) + self.assertIn("AliceB", out) + # The cluster line uses ≡ to join names. + self.assertIn("≡", out) + + +# ---------- clone_many parallel cloning ---------- + +class TestCloneMany(unittest.TestCase): + def test_empty_input_returns_empty(self): + with mock.patch("gitcolombo.subprocess.run") as run: + self.assertEqual(gc.clone_many([], "/tmp/x"), {}) + run.assert_not_called() + + def _stub_clone_factory(self, dest_dir, *, fail_urls=()): + """subprocess.run replacement that creates a fake repo dir on success.""" + def fake_run(argv, **kw): + url = argv[2] + target = argv[3] + if url in fail_urls: + return types.SimpleNamespace(returncode=128, stdout="", + stderr="boom") + os.makedirs(os.path.join(target, ".git"), exist_ok=True) + return types.SimpleNamespace(returncode=0, stdout="", stderr="") + return fake_run + + def test_returns_url_to_path_mapping_on_success(self): + with tempfile.TemporaryDirectory() as tmp: + urls = [ + "https://github.com/u/a", + "https://github.com/u/b", + "https://github.com/u/c", + ] + with mock.patch("gitcolombo.subprocess.run", + side_effect=self._stub_clone_factory(tmp)): + results = gc.clone_many(urls, tmp, workers=4) + self.assertEqual(set(results.keys()), set(urls)) + for url, path in results.items(): + self.assertIsNotNone(path, f"{url} should have a path") + self.assertTrue(os.path.isdir(os.path.join(path, ".git"))) + + def test_failed_clones_map_to_none(self): + with tempfile.TemporaryDirectory() as tmp: + urls = ["https://github.com/u/ok", "https://github.com/u/bad"] + fake = self._stub_clone_factory( + tmp, fail_urls={"https://github.com/u/bad"}, + ) + with mock.patch("gitcolombo.subprocess.run", side_effect=fake): + results = gc.clone_many(urls, tmp, workers=2) + self.assertIsNotNone(results["https://github.com/u/ok"]) + self.assertIsNone(results["https://github.com/u/bad"]) + + def test_runs_one_subprocess_per_url(self): + # Sanity: parallelization must not change the total work performed. + with tempfile.TemporaryDirectory() as tmp: + urls = [f"https://github.com/u/r{i}" for i in range(5)] + with mock.patch("gitcolombo.subprocess.run", + side_effect=self._stub_clone_factory(tmp)) as run: + gc.clone_many(urls, tmp, workers=3) + # 5 URLs => 5 `git clone` invocations, regardless of worker count. + self.assertEqual(run.call_count, 5) + + def test_progress_written_to_stderr_not_stdout(self): + with tempfile.TemporaryDirectory() as tmp: + urls = ["https://github.com/u/a"] + import io + err = io.StringIO() + with mock.patch("gitcolombo.subprocess.run", + side_effect=self._stub_clone_factory(tmp)), \ + mock.patch("sys.stderr", err): + gc.clone_many(urls, tmp, workers=1) + self.assertIn("cloning", err.getvalue()) + self.assertIn("1/1", err.getvalue()) + + +# ---------- GitAnalyst.append with pre-cloned path ---------- + +class TestAppendClonedPath(unittest.TestCase): + def test_cloned_path_bypasses_git_clone(self): + # Regression: clone_many pre-clones URLs in parallel and feeds the + # local path back into append(); we must not re-clone. + with tempfile.TemporaryDirectory() as tmp: + analyst = gc.GitAnalyst(repos_dir=tmp) + with mock.patch("gitcolombo.git_clone") as clone, \ + mock.patch("gitcolombo.git_log", return_value=""): + analyst.append("https://github.com/u/r", cloned_path=tmp) + clone.assert_not_called() + self.assertEqual(analyst.repos, [tmp]) + + def test_source_url_preserved_for_login_resolution(self): + # When append is called with cloned_path, the original URL is still + # passed to _analyze as repo_url so resolve_github_username can use it. + with tempfile.TemporaryDirectory() as tmp: + analyst = gc.GitAnalyst(repos_dir=tmp) + captured = {} + + def fake_analyze(commits, repo_url): + captured["repo_url"] = repo_url + + with mock.patch.object(analyst, "_analyze", side_effect=fake_analyze), \ + mock.patch("gitcolombo.git_log", return_value=""): + analyst.append("https://github.com/u/r", cloned_path=tmp) + self.assertEqual(captured["repo_url"], "https://github.com/u/r") + + +# ---------- get_github_repos transparency ---------- + +class TestGetGithubReposLogging(unittest.TestCase): + """The 245→31 confusion was caused by silent fork-skipping and silent + page-fetch failures. The function now logs a summary so the user can + explain the gap without --debug. + """ + + def test_logs_listing_summary_with_fork_count(self): + page = [ + {"html_url": "https://github.com/u/keep1", "fork": False}, + {"html_url": "https://github.com/u/keep2", "fork": False}, + {"html_url": "https://github.com/u/fork1", "fork": True}, + {"html_url": "https://github.com/u/fork2", "fork": True}, + {"html_url": "https://github.com/u/fork3", "fork": True}, + ] + with mock.patch("gitcolombo._http_get_json", return_value=page), \ + self.assertLogs("gitcolombo", level="INFO") as cm: + kept = gc.get_github_repos("u", repos_count=5) + joined = "\n".join(cm.output) + self.assertIn("5 seen", joined) + self.assertIn("3 forks skipped", joined) + self.assertEqual(len(kept), 2) + + def test_logs_kept_wording_when_include_forks(self): + page = [ + {"html_url": "https://github.com/u/r1", "fork": False}, + {"html_url": "https://github.com/u/r2", "fork": True}, + ] + with mock.patch("gitcolombo._http_get_json", return_value=page), \ + self.assertLogs("gitcolombo", level="INFO") as cm: + gc.get_github_repos("u", repos_count=2, include_forks=True) + joined = "\n".join(cm.output) + self.assertIn("forks kept", joined) + + def test_warns_on_failed_page(self): + # _http_get_json returns None when the API call fails (rate limit, + # network error, etc.). The user must see this, not have repos + # silently disappear. + with mock.patch("gitcolombo._http_get_json", return_value=None), \ + self.assertLogs("gitcolombo", level="WARNING") as cm: + gc.get_github_repos("u", repos_count=100) + joined = "\n".join(cm.output) + self.assertIn("returned no data", joined) + self.assertIn("rate limit", joined) + + def test_collect_sources_passes_include_forks_flag(self): + args = types.SimpleNamespace( + url=None, dir=None, recursive=False, nickname="u", + github=False, debug=False, include_forks=True, + clone_workers=gc.CLONE_WORKERS, no_color=False, + ) + with mock.patch("gitcolombo.get_public_repos_count", return_value=1), \ + mock.patch("gitcolombo.get_github_repos") as gh: + gh.return_value = set() + gc._collect_sources(args) + gh.assert_called_once() + self.assertTrue(gh.call_args.kwargs["include_forks"]) + + if __name__ == "__main__": unittest.main()