#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // Лабораторная работа 4. Сетевые соединения. Сокеты. // Вариант из лабораторной 1: рекурсивная сортировка разделением. // // В отличие от lab1/lab3, данные между родителем и потомками передаются // не через общую память и не через pipe(), а через TCP-сокеты localhost. // Каждый процесс, которому нужно разделить задачу, открывает серверный сокет // на 127.0.0.1:port_base + pid, порождает двух потомков и передает им части // массива в формате: depth / size / array. Потомки возвращают: // counter / size / sorted_array / process_count. using i32 = int32_t; using u32 = uint32_t; using u64 = uint64_t; struct Options { size_t size = 10000; int max_depth = 3; size_t min_size = 1; unsigned seed = 1337; int port_base = 20000; bool print = false; bool log = false; }; struct SortResult { std::vector data; u64 counter = 0; // счетчик операций слияния/сравнений u64 processes = 1; // текущее поддерево процессов, включая текущий процесс }; static double now_seconds() { using clock = std::chrono::steady_clock; static const auto start = clock::now(); return std::chrono::duration(clock::now() - start).count(); } static void log_event(const char* type, int depth, size_t n, int port = -1) { std::ostringstream ss; ss << type << " PID=" << static_cast(getpid()) << " PPID=" << static_cast(getppid()) << " depth=" << depth << " size=" << n; if (port >= 0) ss << " port=" << port; ss << " time=" << now_seconds() << '\n'; const std::string s = ss.str(); (void)!write(STDOUT_FILENO, s.data(), s.size()); } [[noreturn]] static void die_child(const std::string& msg) { std::cerr << "CHILD_ERROR pid=" << getpid() << " " << msg << "\n"; _exit(2); } static void throw_errno(const std::string& what) { throw std::runtime_error(what + ": " + std::strerror(errno)); } static void close_checked(int fd) { if (fd >= 0) { while (close(fd) < 0 && errno == EINTR) {} } } static void set_common_socket_options(int fd) { int yes = 1; (void)setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); #ifdef SO_REUSEPORT (void)setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)); #endif (void)setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); // Блокирующий режим чтения оставляем стандартным, но задаем таймаут, // чтобы ошибка соединения не превращалась в бесконечное зависание. timeval tv{}; tv.tv_sec = 30; tv.tv_usec = 0; (void)setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); (void)setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); } static int port_for_pid(pid_t pid, int port_base) { if (port_base < 1024 || port_base > 65000) { throw std::runtime_error("port-base must be in range 1024..65000"); } const int span = 65535 - port_base; return port_base + static_cast(static_cast(pid) % span); } static void write_all(int fd, const void* ptr, size_t bytes) { const char* p = static_cast(ptr); while (bytes > 0) { ssize_t w = send(fd, p, bytes, MSG_NOSIGNAL); if (w < 0) { if (errno == EINTR) continue; throw_errno("send"); } if (w == 0) throw std::runtime_error("send returned 0"); p += w; bytes -= static_cast(w); } } static void read_all(int fd, void* ptr, size_t bytes) { char* p = static_cast(ptr); while (bytes > 0) { ssize_t r = recv(fd, p, bytes, MSG_WAITALL); if (r < 0) { if (errno == EINTR) continue; throw_errno("recv"); } if (r == 0) throw std::runtime_error("unexpected EOF in socket"); p += r; bytes -= static_cast(r); } } static void send_u32(int fd, u32 v) { write_all(fd, &v, sizeof(v)); } static void send_u64(int fd, u64 v) { write_all(fd, &v, sizeof(v)); } static u32 recv_u32(int fd) { u32 v = 0; read_all(fd, &v, sizeof(v)); return v; } static u64 recv_u64(int fd) { u64 v = 0; read_all(fd, &v, sizeof(v)); return v; } static void send_task(int fd, int depth, const std::vector& a) { send_u32(fd, static_cast(depth)); send_u64(fd, static_cast(a.size())); if (!a.empty()) write_all(fd, a.data(), a.size() * sizeof(i32)); } static std::pair> recv_task(int fd) { int depth = static_cast(recv_u32(fd)); u64 n = recv_u64(fd); if (n > static_cast(SIZE_MAX / sizeof(i32))) { throw std::runtime_error("too large task array"); } std::vector a(static_cast(n)); if (!a.empty()) read_all(fd, a.data(), a.size() * sizeof(i32)); return {depth, std::move(a)}; } static void send_result(int fd, const SortResult& result) { send_u64(fd, result.counter); send_u64(fd, static_cast(result.data.size())); if (!result.data.empty()) write_all(fd, result.data.data(), result.data.size() * sizeof(i32)); // Расширение протокола для статистики. Первые три поля соответствуют заданию: // counter / size / array. send_u64(fd, result.processes); } static SortResult recv_result(int fd) { SortResult r; r.counter = recv_u64(fd); u64 n = recv_u64(fd); if (n > static_cast(SIZE_MAX / sizeof(i32))) { throw std::runtime_error("too large result array"); } r.data.resize(static_cast(n)); if (!r.data.empty()) read_all(fd, r.data.data(), r.data.size() * sizeof(i32)); r.processes = recv_u64(fd); return r; } static int create_server_socket(int preferred_port, int& actual_port) { // Основной вариант соответствует методичке: порт вычисляется от pid. // Во время длинных серий benchmark порт иногда может быть еще занят ядром // или совпасть по modulo. Поэтому при EADDRINUSE берем ближайший свободный // порт и передаем именно его потомкам. std::string last_error; for (int shift = 0; shift < 2000; ++shift) { int port = preferred_port + shift; if (port > 65535) port = 1024 + (port - 65536); int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) throw_errno("socket"); set_common_socket_options(fd); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); addr.sin_port = htons(static_cast(port)); if (bind(fd, reinterpret_cast(&addr), sizeof(addr)) == 0) { if (listen(fd, 2) < 0) { close_checked(fd); throw_errno("listen"); } actual_port = port; return fd; } last_error = std::strerror(errno); close_checked(fd); if (errno != EADDRINUSE && errno != EACCES) { throw std::runtime_error("bind 127.0.0.1:" + std::to_string(port) + ": " + last_error); } } throw std::runtime_error("cannot bind server socket near port " + std::to_string(preferred_port) + ": " + last_error); } static int accept_client(int server_fd) { for (;;) { int fd = accept(server_fd, nullptr, nullptr); if (fd < 0) { if (errno == EINTR) continue; throw_errno("accept"); } set_common_socket_options(fd); return fd; } } static int connect_to_parent(int port) { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) throw_errno("socket"); set_common_socket_options(fd); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); addr.sin_port = htons(static_cast(port)); // После fork сервер обычно уже слушает, но короткий retry делает запуск стабильнее. for (int attempt = 0; attempt < 200; ++attempt) { if (connect(fd, reinterpret_cast(&addr), sizeof(addr)) == 0) return fd; if (errno != ECONNREFUSED && errno != EINTR) break; usleep(1000); } close_checked(fd); throw_errno("connect 127.0.0.1:" + std::to_string(port)); throw std::runtime_error("unreachable connect failure"); } static std::vector merge_sorted(const std::vector& left, const std::vector& right, u64& counter) { std::vector out; out.reserve(left.size() + right.size()); size_t i = 0, j = 0; while (i < left.size() && j < right.size()) { ++counter; if (left[i] <= right[j]) out.push_back(left[i++]); else out.push_back(right[j++]); } out.insert(out.end(), left.begin() + static_cast(i), left.end()); out.insert(out.end(), right.begin() + static_cast(j), right.end()); return out; } static SortResult local_sort(std::vector a) { SortResult r; if (a.size() < 2) { r.data = std::move(a); return r; } const size_t mid = a.size() / 2; std::vector left(a.begin(), a.begin() + static_cast(mid)); std::vector right(a.begin() + static_cast(mid), a.end()); SortResult l = local_sort(std::move(left)); SortResult rr = local_sort(std::move(right)); r.counter = l.counter + rr.counter; r.data = merge_sorted(l.data, rr.data, r.counter); return r; } static SortResult socket_recursive_sort(std::vector a, int depth, const Options& opt); static pid_t spawn_child_and_send(int server_fd, int parent_port, const std::vector& part, int child_depth, const Options& opt, int& child_socket) { pid_t pid = fork(); if (pid < 0) throw_errno("fork"); if (pid == 0) { try { close_checked(server_fd); int fd = connect_to_parent(parent_port); auto task = recv_task(fd); SortResult result = socket_recursive_sort(std::move(task.second), task.first, opt); send_result(fd, result); close_checked(fd); _exit(0); } catch (const std::exception& e) { die_child(e.what()); } } child_socket = accept_client(server_fd); send_task(child_socket, child_depth, part); return pid; } static SortResult socket_recursive_sort(std::vector a, int depth, const Options& opt) { int my_port = port_for_pid(getpid(), opt.port_base); if (opt.log) log_event("START", depth, a.size(), my_port); if (a.size() < 2 || depth >= opt.max_depth || a.size() <= opt.min_size) { SortResult r = local_sort(std::move(a)); r.processes = 1; if (opt.log) log_event("END", depth, r.data.size(), my_port); return r; } const size_t mid = a.size() / 2; std::vector left(a.begin(), a.begin() + static_cast(mid)); std::vector right(a.begin() + static_cast(mid), a.end()); int actual_port = my_port; int server_fd = create_server_socket(my_port, actual_port); int left_sock = -1; pid_t left_pid = spawn_child_and_send(server_fd, actual_port, left, depth + 1, opt, left_sock); int right_sock = -1; pid_t right_pid = spawn_child_and_send(server_fd, actual_port, right, depth + 1, opt, right_sock); close_checked(server_fd); SortResult left_result = recv_result(left_sock); SortResult right_result = recv_result(right_sock); close_checked(left_sock); close_checked(right_sock); int status_left = 0, status_right = 0; while (waitpid(left_pid, &status_left, 0) < 0 && errno == EINTR) {} while (waitpid(right_pid, &status_right, 0) < 0 && errno == EINTR) {} if (!WIFEXITED(status_left) || WEXITSTATUS(status_left) != 0) { throw std::runtime_error("left child failed"); } if (!WIFEXITED(status_right) || WEXITSTATUS(status_right) != 0) { throw std::runtime_error("right child failed"); } SortResult result; result.counter = left_result.counter + right_result.counter; result.data = merge_sorted(left_result.data, right_result.data, result.counter); result.processes = 1 + left_result.processes + right_result.processes; if (opt.log) log_event("END", depth, result.data.size(), my_port); return result; } static Options parse_args(int argc, char** argv) { Options opt; for (int i = 1; i < argc; ++i) { std::string s = argv[i]; auto need_value = [&](const std::string& name) -> std::string { if (i + 1 >= argc) throw std::runtime_error("missing value for " + name); return argv[++i]; }; if (s == "--size" || s == "-n") opt.size = std::stoull(need_value(s)); else if (s == "--depth" || s == "-d") opt.max_depth = std::stoi(need_value(s)); else if (s == "--min-size" || s == "-m") opt.min_size = std::stoull(need_value(s)); else if (s == "--seed") opt.seed = static_cast(std::stoul(need_value(s))); else if (s == "--port-base") opt.port_base = std::stoi(need_value(s)); else if (s == "--print") opt.print = true; else if (s == "--log") opt.log = true; else if (s == "--help" || s == "-h") { std::cout << "Usage: ./lab4 [--size N] [--depth D] [--min-size M] [--seed S] " << "[--port-base P] [--print] [--log]\n"; std::exit(0); } else { throw std::runtime_error("unknown argument: " + s); } } if (opt.max_depth < 0) throw std::runtime_error("depth must be non-negative"); if (opt.port_base < 1024 || opt.port_base > 65000) { throw std::runtime_error("port-base must be in range 1024..65000"); } return opt; } static std::vector generate_data(const Options& opt) { std::vector a(opt.size); std::mt19937 rng(opt.seed); std::uniform_int_distribution dist(-100000000, 100000000); for (auto& x : a) x = dist(rng); return a; } int main(int argc, char** argv) { try { Options opt = parse_args(argc, argv); std::vector data = generate_data(opt); const auto t1 = std::chrono::steady_clock::now(); SortResult result = socket_recursive_sort(std::move(data), 0, opt); const auto t2 = std::chrono::steady_clock::now(); const double elapsed = std::chrono::duration(t2 - t1).count(); const bool ok = std::is_sorted(result.data.begin(), result.data.end()); if (opt.print) { for (size_t i = 0; i < result.data.size(); ++i) { if (i) std::cout << ' '; std::cout << result.data[i]; } std::cout << '\n'; } else { std::cout << "Sorted list/array first 20 elements: "; const size_t limit = std::min(20, result.data.size()); for (size_t i = 0; i < limit; ++i) std::cout << result.data[i] << ' '; std::cout << '\n'; } std::cerr << "STAT: size=" << opt.size << " depth=" << opt.max_depth << " min_size=" << opt.min_size << " processes=" << result.processes << " counter=" << result.counter << " valid=" << (ok ? 1 : 0) << " time=" << elapsed << " sec\n"; return ok ? 0 : 3; } catch (const std::exception& e) { std::cerr << "ERROR: " << e.what() << "\n"; return 1; } }