#!/usr/bin/env python3 import argparse import re import sys from dataclasses import dataclass from decimal import Decimal from typing import Dict, Iterable, List, Tuple from openpyxl import Workbook from openpyxl.chart import BarChart, Reference from openpyxl.styles import Font LINE_RE = re.compile( r"^(START|END)\s+PID=(\d+)\s+PPID=(\d+)\s+depth=(\d+)\s+range=\[(\d+),(\d+)\]\s+time=([0-9.]+)$" ) @dataclass class ProcessLog: pid: int ppid: int start: Decimal start_text: str finish: Decimal | None = None finish_text: str | None = None first_seen_order: int = 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Convert process START/END log to a ptree-like Excel report." ) parser.add_argument( "-i", "--input", default="-", help="Input log file path (default: stdin)", ) parser.add_argument( "-o", "--output", default="process_log.xlsx", help="Output Excel file path (default: process_log.xlsx)", ) return parser.parse_args() def read_lines(input_path: str) -> Iterable[str]: if input_path == "-": for line in sys.stdin: yield line.rstrip("\n") return with open(input_path, "r", encoding="utf-8") as f: for line in f: yield line.rstrip("\n") def parse_processes(lines: Iterable[str]) -> Dict[int, ProcessLog]: processes: Dict[int, ProcessLog] = {} order = 0 for raw in lines: m = LINE_RE.match(raw.strip()) if not m: continue event, pid_s, ppid_s, _depth_s, _l_s, _r_s, ts_s = m.groups() pid = int(pid_s) ppid = int(ppid_s) ts = Decimal(ts_s) if event == "START": processes[pid] = ProcessLog( pid=pid, ppid=ppid, start=ts, start_text=ts_s, finish=None, finish_text=None, first_seen_order=order, ) order += 1 else: if pid in processes: processes[pid].finish = ts processes[pid].finish_text = ts_s return processes def tree_rows(processes: Dict[int, ProcessLog]) -> List[Tuple[int, ProcessLog]]: complete = {pid: p for pid, p in processes.items() if p.finish is not None} children: Dict[int, List[int]] = {} for pid in complete: children[pid] = [] for pid, node in complete.items(): if node.ppid in complete: children[node.ppid].append(pid) for pid in children: children[pid].sort(key=lambda cpid: complete[cpid].start) roots = [pid for pid, node in complete.items() if node.ppid not in complete] roots.sort(key=lambda pid: complete[pid].start) ordered: List[Tuple[int, ProcessLog]] = [] def dfs(pid: int, depth: int) -> None: ordered.append((depth, complete[pid])) for ch in children[pid]: dfs(ch, depth + 1) for root_pid in roots: dfs(root_pid, 0) remaining = [pid for pid in complete if pid not in {p.pid for _, p in ordered}] remaining.sort(key=lambda pid: complete[pid].start) for pid in remaining: dfs(pid, 0) return ordered def fmt6(value: Decimal) -> str: return f"{value.quantize(Decimal('0.000001'))}" def export_excel(rows: List[Tuple[int, ProcessLog]], output_path: str) -> None: wb = Workbook() ws = wb.active ws.title = "process_logs" ws.append(["PPID", "PID", "START", "TIME_FROM_START", "FINISH", "DURATION"]) for cell in ws[1]: cell.font = Font(bold=True) base_start = min((node.start for _, node in rows), default=Decimal("0")) for depth, node in rows: if node.finish is None: continue if node.finish_text is None: continue time_from_start = node.start - base_start duration = node.finish - node.start pid_display = f"{' ' * depth}{node.pid}" ws.append( [ node.ppid, pid_display, node.start_text, fmt6(time_from_start), node.finish_text, fmt6(duration), ] ) ws.column_dimensions["A"].width = 12 ws.column_dimensions["B"].width = 20 ws.column_dimensions["C"].width = 18 ws.column_dimensions["D"].width = 18 ws.column_dimensions["E"].width = 18 ws.column_dimensions["F"].width = 14 ws.freeze_panes = "A2" last_row = ws.max_row if last_row >= 2: chart = BarChart() chart.type = "bar" chart.grouping = "stacked" chart.overlap = 100 chart.gapWidth = 150 chart.legend.position = "b" chart.width = 15 chart.height = 7.5 chart.x_axis.scaling.orientation = "maxMin" data = Reference(ws, min_col=4, max_col=4, min_row=2, max_row=last_row) duration = Reference(ws, min_col=6, max_col=6, min_row=2, max_row=last_row) chart.add_data(data, titles_from_data=False) chart.add_data(duration, titles_from_data=False) chart.series[0].graphicalProperties.noFill = True ws.add_chart(chart, "D21") wb.save(output_path) def main() -> int: args = parse_args() processes = parse_processes(read_lines(args.input)) rows = tree_rows(processes) export_excel(rows, args.output) print(f"Saved to {args.output}") return 0 if __name__ == "__main__": raise SystemExit(main())