diff --git a/mine/lab_7/threads_var12.c b/mine/lab_7/threads_var12.c index c32c7b4..91f0498 100644 --- a/mine/lab_7/threads_var12.c +++ b/mine/lab_7/threads_var12.c @@ -1,39 +1,37 @@ -// threads_var12.c -// Лабораторная №7: многопоточное программирование. [file:22] -// Вариант 12: "Заменить на пробелы все символы, совпадающие с первым символом в строке, кроме первого". [file:22] -// Для каждого входного файла создаётся отдельный поток, который выполняет обработку. [file:22] +// Многопоточная обработка файлов с использованием POSIX threads (pthread). +// Для каждого входного файла создаётся отдельный поток. -#include // printf, fprintf, perror -#include // exit, EXIT_FAILURE, strtol -#include // strlen, strerror, memset -#include // errno -#include // read, write, close -#include // open, O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC -#include // режимы доступа к файлам -#include // pthread_t, pthread_create, pthread_join, pthread_mutex_t [file:21] +#include +#include +#include +#include +#include +#include +#include +#include // POSIX-потоки: pthread_t, pthread_create, pthread_join, mutex -#define RBUFSZ 4096 // Размер буфера чтения. [file:73] -#define WBUFSZ 4096 // Размер буфера записи. [file:73] -#define MAX_THREADS 100 // Ограничение на количество потоков. [file:22] +#define RBUFSZ 4096 +#define WBUFSZ 4096 +#define MAX_THREADS 100 -// Структура для параметров потока: имена файлов и лимит замен. [file:22] +// Параметры, которые главный поток передаёт в поток pthread_create. typedef struct { - const char *input_path; // Имя входного файла. [file:72] - const char *output_path; // Имя выходного файла. [file:72] - long long max_repl; // Максимальное количество замен для данного файла. [file:73] - int thread_index; // Индекс потока для логирования. [file:22] - int result; // Код завершения потока (0 или -1). [file:22] + const char *input_path; + const char *output_path; + long long max_repl; + int thread_index; + int result; // Код завершения работы потока (заполняется самим потоком). } ThreadTask; -// Глобальный мьютекс для синхронизации вывода в терминал. [file:22] +// Глобальный мьютекс для синхронизации вывода из нескольких потоков. static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; -// Потокобезопасная функция вывода сообщений об ошибках. [file:73] +// Выводит сообщение об ошибке, защищая вывод мьютексом (чтобы строки не перемешивались). static void die_perror_thread(const char *what, const char *path, int exit_code) { - int saved = errno; // Сохраняем errno, чтобы не потерять код ошибки. [file:73] - char msg[512]; // Локальный буфер для сообщения. [file:73] + int saved = errno; + char msg[512]; if (path) { snprintf(msg, sizeof(msg), @@ -43,18 +41,18 @@ static void die_perror_thread(const char *what, "%s: %s\n", what, strerror(saved)); } - pthread_mutex_lock(&log_mutex); // Захватываем мьютекс для безопасного вывода. [file:22] - write(STDERR_FILENO, msg, strlen(msg)); // Пишем сообщение в stderr. [file:73] - pthread_mutex_unlock(&log_mutex); // Освобождаем мьютекс. [file:22] + pthread_mutex_lock(&log_mutex); + write(STDERR_FILENO, msg, strlen(msg)); + pthread_mutex_unlock(&log_mutex); - (void) exit_code; // В потоке не выходим из процесса, код возврата ставится в структуре. [file:22] + (void) exit_code; } -// Разбор положительного long long из строки. [file:73] +// Разбор положительного long long из строки. static long long parse_ll(const char *s) { char *end = NULL; errno = 0; - long long v = strtoll(s, &end, 10); // Преобразование строки в 64-битное целое. [file:73] + long long v = strtoll(s, &end, 10); if (errno != 0 || end == s || *end != '\0' || v < 0) { errno = EINVAL; return -1; @@ -62,11 +60,11 @@ static long long parse_ll(const char *s) { return v; } -// Функция, реализующая функциональность ЛР2 для одного файла. [file:73] +// Однопоточная обработка одного файла (фактически «тело работы» для потока). static int process_file_variant12(const char *in_path, const char *out_path, long long cap) { - int in_fd = open(in_path, O_RDONLY); // Открываем входной файл для чтения. [file:73] + int in_fd = open(in_path, O_RDONLY); if (in_fd < 0) { die_perror_thread("open input failed", in_path, -1); return -1; @@ -74,52 +72,50 @@ static int process_file_variant12(const char *in_path, mode_t perms = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | - S_IROTH | S_IWOTH; // rw-rw-rw-. [file:73] + S_IROTH | S_IWOTH; int out_fd = open(out_path, O_CREAT | O_WRONLY | O_TRUNC, - perms); // Создаём/очищаем выходной файл. [file:73] + perms); if (out_fd < 0) { die_perror_thread("open output failed", out_path, -1); close(in_fd); return -1; } - char rbuf[RBUFSZ]; // Буфер для чтения. [file:73] - char wbuf[WBUFSZ]; // Буфер для записи. [file:73] - size_t wlen = 0; // Текущая длина буфера записи. [file:73] + char rbuf[RBUFSZ]; + char wbuf[WBUFSZ]; + size_t wlen = 0; - long long total_replacements = 0; // Общее число замен по всему файлу. [file:73] + long long total_replacements = 0; - int at_line_start = 1; // Флаг: в начале строки. [file:73] - unsigned char line_key = 0; // Первый символ текущей строки. [file:73] + int at_line_start = 1; + unsigned char line_key = 0; for (;;) { - ssize_t n = read(in_fd, rbuf, sizeof(rbuf)); // Читаем блок из файла. [file:73] + ssize_t n = read(in_fd, rbuf, sizeof(rbuf)); if (n > 0) { for (ssize_t i = 0; i < n; i++) { - unsigned char c = (unsigned char) rbuf[i]; // Текущий символ. [file:73] + unsigned char c = (unsigned char) rbuf[i]; if (at_line_start) { - // Если это первый символ в строке. [file:73] - line_key = c; // Запоминаем его как ключ. [file:73] - at_line_start = 0; // Больше не начало строки. [file:73] + line_key = c; + at_line_start = 0; } - unsigned char outc = c; // По умолчанию символ не меняется. [file:73] + unsigned char outc = c; if (c == '\n') { - // Конец строки. [file:73] - at_line_start = 1; // Следующий символ будет началом новой строки. [file:73] + at_line_start = 1; } else if (c == line_key) { if (total_replacements < cap) { - outc = ' '; // Заменяем на пробел. [file:22] + outc = ' '; total_replacements++; } } if (wlen == sizeof(wbuf)) { - ssize_t wrote = write(out_fd, wbuf, wlen); // Сбрасываем буфер в файл. [file:73] + ssize_t wrote = write(out_fd, wbuf, wlen); if (wrote < 0 || (size_t) wrote != wlen) { die_perror_thread("write failed", out_path, -1); close(in_fd); @@ -129,11 +125,10 @@ static int process_file_variant12(const char *in_path, wlen = 0; } - wbuf[wlen++] = (char) outc; // Добавляем символ в буфер записи. [file:73] + wbuf[wlen++] = (char) outc; } } else if (n == 0) { if (wlen > 0) { - // Если остались несброшенные данные. [file:73] ssize_t wrote = write(out_fd, wbuf, wlen); if (wrote < 0 || (size_t) wrote != wlen) { die_perror_thread("write failed", out_path, -1); @@ -143,7 +138,7 @@ static int process_file_variant12(const char *in_path, } wlen = 0; } - break; // Достигнут конец файла. [file:73] + break; } else { die_perror_thread("read failed", in_path, -1); close(in_fd); @@ -163,7 +158,9 @@ static int process_file_variant12(const char *in_path, return -1; } - pthread_mutex_lock(&log_mutex); // Печать числа замен в stdout под мьютексом. [file:22] + // Печать результата тоже защищена мьютексом, чтобы несколько потоков + // не писали одновременно в stdout и строки не «рвали» друг друга. + pthread_mutex_lock(&log_mutex); char res[64]; int m = snprintf(res, sizeof(res), "%lld\n", total_replacements); if (m > 0) { @@ -174,7 +171,7 @@ static int process_file_variant12(const char *in_path, return 0; } -// Функция потока: обёртка над process_file_variant12. [file:22] +// Функция, которую будет выполнять каждый поток, созданный через pthread_create. static void *thread_func(void *arg) { ThreadTask *task = (ThreadTask *) arg; @@ -190,7 +187,7 @@ static void *thread_func(void *arg) { task->output_path, task->max_repl); - task->result = rc; + task->result = rc; // Поток записывает свой код завершения в структуру. pthread_mutex_lock(&log_mutex); printf("[thread %d] finished with code %d\n", @@ -198,7 +195,7 @@ static void *thread_func(void *arg) { task->result); pthread_mutex_unlock(&log_mutex); - return NULL; + return NULL; // Возврат из функции = завершение pthread. } static void print_usage(const char *progname) { @@ -210,7 +207,7 @@ static void print_usage(const char *progname) { progname); } -// Головной поток: создаёт потоки, ждёт их завершения и выводит статистику. [file:22] +// Главный поток: создаёт потоки pthread_create, затем ждёт их завершения через pthread_join. int main(int argc, char *argv[]) { if (argc < 4 || ((argc - 2) % 2) != 0) { fprintf(stderr, @@ -238,9 +235,10 @@ int main(int argc, char *argv[]) { printf("Максимум замен на файл: %lld\n", cap); printf("Количество файловых пар: %d\n\n", num_files); - pthread_t threads[MAX_THREADS]; - ThreadTask tasks[MAX_THREADS]; + pthread_t threads[MAX_THREADS]; // Идентификаторы POSIX-потоков. + ThreadTask tasks[MAX_THREADS]; // Массив задач, по одной на поток. + // Создаём по одному потоку на каждую пару input/output. for (int i = 0; i < num_files; i++) { const char *input_path = argv[2 + i * 2]; const char *output_path = argv[3 + i * 2]; @@ -252,19 +250,21 @@ int main(int argc, char *argv[]) { tasks[i].result = -1; int rc = pthread_create(&threads[i], - NULL, - thread_func, - &tasks[i]); + NULL, // атрибуты по умолчанию + thread_func, // функция, с которой стартует поток + &tasks[i]); // аргумент, передаваемый функции if (rc != 0) { errno = rc; die_perror_thread("pthread_create failed", NULL, -1); tasks[i].result = -1; + threads[i] = 0; // помечаем, что поток не был создан } } int success_count = 0; int error_count = 0; + // Ждём завершения всех созданных потоков. for (int i = 0; i < num_files; i++) { if (!threads[i]) { error_count++;