#!/usr/bin/env python3 import argparse import re import sys from dataclasses import dataclass from typing import Dict, Iterable, List, Tuple from openpyxl import Workbook from openpyxl.styles import Font LINE_RE = re.compile( r"^(START|END)\s+PID=(\d+)\s+PPID=(\d+)\s+depth=(\d+)\s+range=\[(\d+),(\d+)\]\s+time=([0-9.]+)$" ) @dataclass class ProcessLog: pid: int ppid: int start: float finish: float | None = None first_seen_order: int = 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Convert process START/END log to a ptree-like Excel report." ) parser.add_argument( "-i", "--input", default="-", help="Input log file path (default: stdin)", ) parser.add_argument( "-o", "--output", default="process_log.xlsx", help="Output Excel file path (default: process_log.xlsx)", ) return parser.parse_args() def read_lines(input_path: str) -> Iterable[str]: if input_path == "-": for line in sys.stdin: yield line.rstrip("\n") return with open(input_path, "r", encoding="utf-8") as f: for line in f: yield line.rstrip("\n") def parse_processes(lines: Iterable[str]) -> Dict[int, ProcessLog]: processes: Dict[int, ProcessLog] = {} order = 0 for raw in lines: m = LINE_RE.match(raw.strip()) if not m: continue event, pid_s, ppid_s, _depth_s, _l_s, _r_s, ts_s = m.groups() pid = int(pid_s) ppid = int(ppid_s) ts = float(ts_s) if event == "START": processes[pid] = ProcessLog( pid=pid, ppid=ppid, start=ts, finish=None, first_seen_order=order ) order += 1 else: if pid in processes: processes[pid].finish = ts return processes def tree_rows(processes: Dict[int, ProcessLog]) -> List[Tuple[int, ProcessLog]]: complete = {pid: p for pid, p in processes.items() if p.finish is not None} children: Dict[int, List[int]] = {} for pid in complete: children[pid] = [] for pid, node in complete.items(): if node.ppid in complete: children[node.ppid].append(pid) for pid in children: children[pid].sort(key=lambda cpid: complete[cpid].start) roots = [pid for pid, node in complete.items() if node.ppid not in complete] roots.sort(key=lambda pid: complete[pid].start) ordered: List[Tuple[int, ProcessLog]] = [] def dfs(pid: int, depth: int) -> None: ordered.append((depth, complete[pid])) for ch in children[pid]: dfs(ch, depth + 1) for root_pid in roots: dfs(root_pid, 0) remaining = [pid for pid in complete if pid not in {p.pid for _, p in ordered}] remaining.sort(key=lambda pid: complete[pid].start) for pid in remaining: dfs(pid, 0) return ordered def export_excel(rows: List[Tuple[int, ProcessLog]], output_path: str) -> None: wb = Workbook() ws = wb.active ws.title = "process_logs" ws.append(["PPID", "PID", "START", "FINISH", "DURATION"]) for cell in ws[1]: cell.font = Font(bold=True) for depth, node in rows: if node.finish is None: continue duration = node.finish - node.start pid_display = f"{' ' * depth}{node.pid}" ws.append([node.ppid, pid_display, node.start, node.finish, duration]) ws.column_dimensions["A"].width = 12 ws.column_dimensions["B"].width = 20 ws.column_dimensions["C"].width = 18 ws.column_dimensions["D"].width = 18 ws.column_dimensions["E"].width = 14 ws.freeze_panes = "A2" wb.save(output_path) def main() -> int: args = parse_args() processes = parse_processes(read_lines(args.input)) rows = tree_rows(processes) export_excel(rows, args.output) print(f"Saved to {args.output}") return 0 if __name__ == "__main__": raise SystemExit(main())