Files
OS-LABS/export.py
T
2026-04-23 10:36:51 +07:00

204 lines
5.5 KiB
Python

#!/usr/bin/env python3
import argparse
import re
import sys
from dataclasses import dataclass
from decimal import Decimal
from typing import Dict, Iterable, List, Tuple
from openpyxl import Workbook
from openpyxl.chart import BarChart, Reference
from openpyxl.styles import Font
LINE_RE = re.compile(
r"^(START|END)\s+PID=(\d+)\s+PPID=(\d+)\s+depth=(\d+)\s+range=\[(\d+),(\d+)\]\s+time=([0-9.]+)$"
)
@dataclass
class ProcessLog:
pid: int
ppid: int
start: Decimal
start_text: str
finish: Decimal | None = None
finish_text: str | None = None
first_seen_order: int = 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert process START/END log to a ptree-like Excel report."
)
parser.add_argument(
"-i",
"--input",
default="-",
help="Input log file path (default: stdin)",
)
parser.add_argument(
"-o",
"--output",
default="process_log.xlsx",
help="Output Excel file path (default: process_log.xlsx)",
)
return parser.parse_args()
def read_lines(input_path: str) -> Iterable[str]:
if input_path == "-":
for line in sys.stdin:
yield line.rstrip("\n")
return
with open(input_path, "r", encoding="utf-8") as f:
for line in f:
yield line.rstrip("\n")
def parse_processes(lines: Iterable[str]) -> Dict[int, ProcessLog]:
processes: Dict[int, ProcessLog] = {}
order = 0
for raw in lines:
m = LINE_RE.match(raw.strip())
if not m:
continue
event, pid_s, ppid_s, _depth_s, _l_s, _r_s, ts_s = m.groups()
pid = int(pid_s)
ppid = int(ppid_s)
ts = Decimal(ts_s)
if event == "START":
processes[pid] = ProcessLog(
pid=pid,
ppid=ppid,
start=ts,
start_text=ts_s,
finish=None,
finish_text=None,
first_seen_order=order,
)
order += 1
else:
if pid in processes:
processes[pid].finish = ts
processes[pid].finish_text = ts_s
return processes
def tree_rows(processes: Dict[int, ProcessLog]) -> List[Tuple[int, ProcessLog]]:
complete = {pid: p for pid, p in processes.items() if p.finish is not None}
children: Dict[int, List[int]] = {}
for pid in complete:
children[pid] = []
for pid, node in complete.items():
if node.ppid in complete:
children[node.ppid].append(pid)
for pid in children:
children[pid].sort(key=lambda cpid: complete[cpid].start)
roots = [pid for pid, node in complete.items() if node.ppid not in complete]
roots.sort(key=lambda pid: complete[pid].start)
ordered: List[Tuple[int, ProcessLog]] = []
def dfs(pid: int, depth: int) -> None:
ordered.append((depth, complete[pid]))
for ch in children[pid]:
dfs(ch, depth + 1)
for root_pid in roots:
dfs(root_pid, 0)
remaining = [pid for pid in complete if pid not in {p.pid for _, p in ordered}]
remaining.sort(key=lambda pid: complete[pid].start)
for pid in remaining:
dfs(pid, 0)
return ordered
def fmt6(value: Decimal) -> str:
return f"{value.quantize(Decimal('0.000001'))}"
def export_excel(rows: List[Tuple[int, ProcessLog]], output_path: str) -> None:
wb = Workbook()
ws = wb.active
ws.title = "process_logs"
ws.append(["PPID", "PID", "START", "TIME_FROM_START", "FINISH", "DURATION"])
for cell in ws[1]:
cell.font = Font(bold=True)
base_start = min((node.start for _, node in rows), default=Decimal("0"))
for depth, node in rows:
if node.finish is None:
continue
if node.finish_text is None:
continue
time_from_start = node.start - base_start
duration = node.finish - node.start
pid_display = f"{' ' * depth}{node.pid}"
ws.append(
[
node.ppid,
pid_display,
node.start_text,
fmt6(time_from_start),
node.finish_text,
fmt6(duration),
]
)
ws.column_dimensions["A"].width = 12
ws.column_dimensions["B"].width = 20
ws.column_dimensions["C"].width = 18
ws.column_dimensions["D"].width = 18
ws.column_dimensions["E"].width = 18
ws.column_dimensions["F"].width = 14
ws.freeze_panes = "A2"
last_row = ws.max_row
if last_row >= 2:
chart = BarChart()
chart.type = "bar"
chart.grouping = "stacked"
chart.overlap = 100
chart.gapWidth = 150
chart.legend.position = "b"
chart.width = 15
chart.height = 7.5
chart.x_axis.scaling.orientation = "maxMin"
data = Reference(ws, min_col=4, max_col=4, min_row=2, max_row=last_row)
duration = Reference(ws, min_col=6, max_col=6, min_row=2, max_row=last_row)
chart.add_data(data, titles_from_data=False)
chart.add_data(duration, titles_from_data=False)
chart.series[0].graphicalProperties.noFill = True
ws.add_chart(chart, "D21")
wb.save(output_path)
def main() -> int:
args = parse_args()
processes = parse_processes(read_lines(args.input))
rows = tree_rows(processes)
export_excel(rows, args.output)
print(f"Saved to {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())