#include #include #include #include #include #include #include #include #include #include #include #include #include #include // Лабораторная работа 3. Процессы. Неименованные каналы. // Вариант 13: 72-01. Сортировка массива рекурсивным разделением. // // Идея: родительский процесс делит массив на две части, создает двух потомков, // передает им части массива через pipe, потомки сортируют свои части тем же // алгоритмом до заданной глубины. При достижении max_depth или min_size сортировка // выполняется локально в одном процессе. Обратно потомки возвращают отсортированные // массивы тоже через pipe. Родитель выполняет слияние. using i32 = int32_t; using u64 = uint64_t; struct Options { size_t size = 100000; int max_depth = 2; size_t min_size = 4096; unsigned seed = 1337; bool print = false; bool log = false; }; struct SortResult { std::vector data; u64 processes = 1; // текущий процесс тоже считается }; static double now_seconds() { using clock = std::chrono::steady_clock; static const auto start = clock::now(); auto t = clock::now() - start; return std::chrono::duration(t).count(); } static void log_event(const char* type, int depth, size_t n) { // Одна строка короче PIPE_BUF, поэтому при выводе в общий файл обычно не рвется. std::ostringstream ss; ss << type << " PID=" << static_cast(getpid()) << " PPID=" << static_cast(getppid()) << " depth=" << depth << " size=" << n << " time=" << now_seconds() << "\n"; const std::string s = ss.str(); (void)!write(STDOUT_FILENO, s.data(), s.size()); } [[noreturn]] static void die_child(const std::string& msg) { std::cerr << "CHILD_ERROR pid=" << getpid() << " " << msg << "\n"; _exit(2); } static void throw_errno(const std::string& what) { throw std::runtime_error(what + ": " + std::strerror(errno)); } static void write_all(int fd, const void* ptr, size_t bytes) { const char* p = static_cast(ptr); while (bytes > 0) { ssize_t w = write(fd, p, bytes); if (w < 0) { if (errno == EINTR) continue; throw_errno("write"); } if (w == 0) throw std::runtime_error("write returned 0"); p += w; bytes -= static_cast(w); } } static void read_all(int fd, void* ptr, size_t bytes) { char* p = static_cast(ptr); while (bytes > 0) { ssize_t r = read(fd, p, bytes); if (r < 0) { if (errno == EINTR) continue; throw_errno("read"); } if (r == 0) throw std::runtime_error("unexpected EOF in pipe"); p += r; bytes -= static_cast(r); } } static void close_checked(int fd) { if (fd >= 0) { while (close(fd) < 0 && errno == EINTR) {} } } static void send_vector(int fd, const std::vector& a) { u64 n = static_cast(a.size()); write_all(fd, &n, sizeof(n)); if (!a.empty()) write_all(fd, a.data(), a.size() * sizeof(i32)); } static std::vector recv_vector(int fd) { u64 n = 0; read_all(fd, &n, sizeof(n)); if (n > static_cast(SIZE_MAX / sizeof(i32))) { throw std::runtime_error("too large vector in pipe"); } std::vector a(static_cast(n)); if (!a.empty()) read_all(fd, a.data(), a.size() * sizeof(i32)); return a; } static void send_result(int fd, const SortResult& result) { send_vector(fd, result.data); write_all(fd, &result.processes, sizeof(result.processes)); } static SortResult recv_result(int fd) { SortResult r; r.data = recv_vector(fd); read_all(fd, &r.processes, sizeof(r.processes)); return r; } static std::vector merge_sorted(const std::vector& left, const std::vector& right) { std::vector out; out.reserve(left.size() + right.size()); size_t i = 0, j = 0; while (i < left.size() && j < right.size()) { if (left[i] <= right[j]) out.push_back(left[i++]); else out.push_back(right[j++]); } out.insert(out.end(), left.begin() + static_cast(i), left.end()); out.insert(out.end(), right.begin() + static_cast(j), right.end()); return out; } static void sequential_recursive_sort(std::vector& a) { if (a.size() < 2) return; const size_t mid = a.size() / 2; std::vector left(a.begin(), a.begin() + static_cast(mid)); std::vector right(a.begin() + static_cast(mid), a.end()); sequential_recursive_sort(left); sequential_recursive_sort(right); a = merge_sorted(left, right); } static SortResult process_recursive_sort(std::vector a, int depth, const Options& opt); static pid_t spawn_sort_child(const std::vector& part, int child_depth, const Options& opt, int& result_read_fd) { int to_child[2] = {-1, -1}; int from_child[2] = {-1, -1}; if (pipe(to_child) < 0) throw_errno("pipe to_child"); if (pipe(from_child) < 0) throw_errno("pipe from_child"); pid_t pid = fork(); if (pid < 0) throw_errno("fork"); if (pid == 0) { try { close_checked(to_child[1]); close_checked(from_child[0]); std::vector input = recv_vector(to_child[0]); close_checked(to_child[0]); SortResult result = process_recursive_sort(std::move(input), child_depth, opt); send_result(from_child[1], result); close_checked(from_child[1]); _exit(0); } catch (const std::exception& e) { die_child(e.what()); } } close_checked(to_child[0]); close_checked(from_child[1]); send_vector(to_child[1], part); close_checked(to_child[1]); result_read_fd = from_child[0]; return pid; } static SortResult process_recursive_sort(std::vector a, int depth, const Options& opt) { if (opt.log) log_event("START", depth, a.size()); if (a.size() < 2 || depth >= opt.max_depth || a.size() <= opt.min_size) { sequential_recursive_sort(a); if (opt.log) log_event("END", depth, a.size()); return {std::move(a), 1}; } const size_t mid = a.size() / 2; std::vector left(a.begin(), a.begin() + static_cast(mid)); std::vector right(a.begin() + static_cast(mid), a.end()); int left_fd = -1, right_fd = -1; pid_t left_pid = spawn_sort_child(left, depth + 1, opt, left_fd); pid_t right_pid = spawn_sort_child(right, depth + 1, opt, right_fd); SortResult left_result = recv_result(left_fd); SortResult right_result = recv_result(right_fd); close_checked(left_fd); close_checked(right_fd); int status_left = 0, status_right = 0; while (waitpid(left_pid, &status_left, 0) < 0 && errno == EINTR) {} while (waitpid(right_pid, &status_right, 0) < 0 && errno == EINTR) {} if (!WIFEXITED(status_left) || WEXITSTATUS(status_left) != 0) { throw std::runtime_error("left child failed"); } if (!WIFEXITED(status_right) || WEXITSTATUS(status_right) != 0) { throw std::runtime_error("right child failed"); } SortResult result; result.data = merge_sorted(left_result.data, right_result.data); result.processes = 1 + left_result.processes + right_result.processes; if (opt.log) log_event("END", depth, result.data.size()); return result; } static Options parse_args(int argc, char** argv) { Options opt; for (int i = 1; i < argc; ++i) { std::string s = argv[i]; auto need_value = [&](const std::string& name) -> std::string { if (i + 1 >= argc) throw std::runtime_error("missing value for " + name); return argv[++i]; }; if (s == "--size" || s == "-n") opt.size = std::stoull(need_value(s)); else if (s == "--depth" || s == "-d") opt.max_depth = std::stoi(need_value(s)); else if (s == "--min-size" || s == "-m") opt.min_size = std::stoull(need_value(s)); else if (s == "--seed") opt.seed = static_cast(std::stoul(need_value(s))); else if (s == "--print") opt.print = true; else if (s == "--log") opt.log = true; else if (s == "--help" || s == "-h") { std::cout << "Usage: ./lab3 [--size N] [--depth D] [--min-size M] [--seed S] " << "[--print] [--log]\n"; std::exit(0); } else { throw std::runtime_error("unknown argument: " + s); } } if (opt.max_depth < 0) throw std::runtime_error("depth must be non-negative"); return opt; } static std::vector generate_data(const Options& opt) { // По условию этой версии лабораторной входные данные всегда считаются // полностью случайными. Для повторяемости экспериментов используется --seed. std::vector a(opt.size); std::mt19937 rng(opt.seed); std::uniform_int_distribution dist(-100000000, 100000000); for (auto& x : a) x = dist(rng); return a; } int main(int argc, char** argv) { try { Options opt = parse_args(argc, argv); std::vector data = generate_data(opt); const auto t1 = std::chrono::steady_clock::now(); SortResult result = process_recursive_sort(std::move(data), 0, opt); const auto t2 = std::chrono::steady_clock::now(); const double elapsed = std::chrono::duration(t2 - t1).count(); const bool ok = std::is_sorted(result.data.begin(), result.data.end()); if (opt.print) { for (size_t i = 0; i < result.data.size(); ++i) { if (i) std::cout << ' '; std::cout << result.data[i]; } std::cout << '\n'; } std::cerr << "STAT: size=" << opt.size << " depth=" << opt.max_depth << " min_size=" << opt.min_size << " processes=" << result.processes << " valid=" << (ok ? 1 : 0) << " time=" << elapsed << " sec\n"; return ok ? 0 : 3; } catch (const std::exception& e) { std::cerr << "ERROR: " << e.what() << "\n"; return 1; } }