// threads_var12.c // Лабораторная №7: многопоточное программирование. [file:22] // Вариант 12: "Заменить на пробелы все символы, совпадающие с первым символом в строке, кроме первого". [file:22] // Для каждого входного файла создаётся отдельный поток, который выполняет обработку. [file:22] #include // printf, fprintf, perror #include // exit, EXIT_FAILURE, strtol #include // strlen, strerror, memset #include // errno #include // read, write, close #include // open, O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC #include // режимы доступа к файлам #include // pthread_t, pthread_create, pthread_join, pthread_mutex_t [file:21] #define RBUFSZ 4096 // Размер буфера чтения. [file:73] #define WBUFSZ 4096 // Размер буфера записи. [file:73] #define MAX_THREADS 100 // Ограничение на количество потоков. [file:22] // Структура для параметров потока: имена файлов и лимит замен. [file:22] typedef struct { const char *input_path; // Имя входного файла. [file:72] const char *output_path; // Имя выходного файла. [file:72] long long max_repl; // Максимальное количество замен для данного файла. [file:73] int thread_index; // Индекс потока для логирования. [file:22] int result; // Код завершения потока (0 или -1). [file:22] } ThreadTask; // Глобальный мьютекс для синхронизации вывода в терминал. [file:22] static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; // Потокобезопасная функция вывода сообщений об ошибках. [file:73] static void die_perror_thread(const char *what, const char *path, int exit_code) { int saved = errno; // Сохраняем errno, чтобы не потерять код ошибки. [file:73] char msg[512]; // Локальный буфер для сообщения. [file:73] if (path) { snprintf(msg, sizeof(msg), "%s: %s: %s\n", what, path, strerror(saved)); } else { snprintf(msg, sizeof(msg), "%s: %s\n", what, strerror(saved)); } pthread_mutex_lock(&log_mutex); // Захватываем мьютекс для безопасного вывода. [file:22] write(STDERR_FILENO, msg, strlen(msg)); // Пишем сообщение в stderr. [file:73] pthread_mutex_unlock(&log_mutex); // Освобождаем мьютекс. [file:22] (void) exit_code; // В потоке не выходим из процесса, код возврата ставится в структуре. [file:22] } // Разбор положительного long long из строки. [file:73] static long long parse_ll(const char *s) { char *end = NULL; errno = 0; long long v = strtoll(s, &end, 10); // Преобразование строки в 64-битное целое. [file:73] if (errno != 0 || end == s || *end != '\0' || v < 0) { errno = EINVAL; return -1; } return v; } // Функция, реализующая функциональность ЛР2 для одного файла. [file:73] static int process_file_variant12(const char *in_path, const char *out_path, long long cap) { int in_fd = open(in_path, O_RDONLY); // Открываем входной файл для чтения. [file:73] if (in_fd < 0) { die_perror_thread("open input failed", in_path, -1); return -1; } mode_t perms = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; // rw-rw-rw-. [file:73] int out_fd = open(out_path, O_CREAT | O_WRONLY | O_TRUNC, perms); // Создаём/очищаем выходной файл. [file:73] if (out_fd < 0) { die_perror_thread("open output failed", out_path, -1); close(in_fd); return -1; } char rbuf[RBUFSZ]; // Буфер для чтения. [file:73] char wbuf[WBUFSZ]; // Буфер для записи. [file:73] size_t wlen = 0; // Текущая длина буфера записи. [file:73] long long total_replacements = 0; // Общее число замен по всему файлу. [file:73] int at_line_start = 1; // Флаг: в начале строки. [file:73] unsigned char line_key = 0; // Первый символ текущей строки. [file:73] for (;;) { ssize_t n = read(in_fd, rbuf, sizeof(rbuf)); // Читаем блок из файла. [file:73] if (n > 0) { for (ssize_t i = 0; i < n; i++) { unsigned char c = (unsigned char) rbuf[i]; // Текущий символ. [file:73] if (at_line_start) { // Если это первый символ в строке. [file:73] line_key = c; // Запоминаем его как ключ. [file:73] at_line_start = 0; // Больше не начало строки. [file:73] } unsigned char outc = c; // По умолчанию символ не меняется. [file:73] if (c == '\n') { // Конец строки. [file:73] at_line_start = 1; // Следующий символ будет началом новой строки. [file:73] } else if (c == line_key) { if (total_replacements < cap) { outc = ' '; // Заменяем на пробел. [file:22] total_replacements++; } } if (wlen == sizeof(wbuf)) { ssize_t wrote = write(out_fd, wbuf, wlen); // Сбрасываем буфер в файл. [file:73] if (wrote < 0 || (size_t) wrote != wlen) { die_perror_thread("write failed", out_path, -1); close(in_fd); close(out_fd); return -1; } wlen = 0; } wbuf[wlen++] = (char) outc; // Добавляем символ в буфер записи. [file:73] } } else if (n == 0) { if (wlen > 0) { // Если остались несброшенные данные. [file:73] ssize_t wrote = write(out_fd, wbuf, wlen); if (wrote < 0 || (size_t) wrote != wlen) { die_perror_thread("write failed", out_path, -1); close(in_fd); close(out_fd); return -1; } wlen = 0; } break; // Достигнут конец файла. [file:73] } else { die_perror_thread("read failed", in_path, -1); close(in_fd); close(out_fd); return -1; } } if (close(in_fd) < 0) { die_perror_thread("close input failed", in_path, -1); close(out_fd); return -1; } if (close(out_fd) < 0) { die_perror_thread("close output failed", out_path, -1); return -1; } pthread_mutex_lock(&log_mutex); // Печать числа замен в stdout под мьютексом. [file:22] char res[64]; int m = snprintf(res, sizeof(res), "%lld\n", total_replacements); if (m > 0) { write(STDOUT_FILENO, res, (size_t) m); } pthread_mutex_unlock(&log_mutex); return 0; } // Функция потока: обёртка над process_file_variant12. [file:22] static void *thread_func(void *arg) { ThreadTask *task = (ThreadTask *) arg; pthread_mutex_lock(&log_mutex); printf("[thread %d] start: %s -> %s, max_repl=%lld\n", task->thread_index, task->input_path, task->output_path, task->max_repl); pthread_mutex_unlock(&log_mutex); int rc = process_file_variant12(task->input_path, task->output_path, task->max_repl); task->result = rc; pthread_mutex_lock(&log_mutex); printf("[thread %d] finished with code %d\n", task->thread_index, task->result); pthread_mutex_unlock(&log_mutex); return NULL; } static void print_usage(const char *progname) { fprintf(stderr, "Usage: %s [ ...]\n", progname); fprintf(stderr, "Example: %s 100 in1.txt out1.txt in2.txt out2.txt\n", progname); } // Головной поток: создаёт потоки, ждёт их завершения и выводит статистику. [file:22] int main(int argc, char *argv[]) { if (argc < 4 || ((argc - 2) % 2) != 0) { fprintf(stderr, "ERROR: Недостаточное или неверное количество аргументов\n"); print_usage(argv[0]); return -1; } long long cap = parse_ll(argv[1]); if (cap < 0) { die_perror_thread("invalid max_replacements", argv[1], -1); return -1; } int num_files = (argc - 2) / 2; if (num_files > MAX_THREADS) { fprintf(stderr, "ERROR: Слишком много файлов (максимум %d пар)\n", MAX_THREADS); return -1; } printf("=== Многопоточная обработка, вариант 12 ===\n"); printf("Главный поток TID: %lu\n", (unsigned long) pthread_self()); printf("Максимум замен на файл: %lld\n", cap); printf("Количество файловых пар: %d\n\n", num_files); pthread_t threads[MAX_THREADS]; ThreadTask tasks[MAX_THREADS]; for (int i = 0; i < num_files; i++) { const char *input_path = argv[2 + i * 2]; const char *output_path = argv[3 + i * 2]; tasks[i].input_path = input_path; tasks[i].output_path = output_path; tasks[i].max_repl = cap; tasks[i].thread_index = i + 1; tasks[i].result = -1; int rc = pthread_create(&threads[i], NULL, thread_func, &tasks[i]); if (rc != 0) { errno = rc; die_perror_thread("pthread_create failed", NULL, -1); tasks[i].result = -1; } } int success_count = 0; int error_count = 0; for (int i = 0; i < num_files; i++) { if (!threads[i]) { error_count++; continue; } int rc = pthread_join(threads[i], NULL); if (rc != 0) { errno = rc; die_perror_thread("pthread_join failed", NULL, -1); error_count++; continue; } if (tasks[i].result == 0) { success_count++; } else { error_count++; } } printf("\n=== Итоговая статистика ===\n"); printf("Всего потоков: %d\n", num_files); printf("Успешно завершено: %d\n", success_count); printf("С ошибкой: %d\n", error_count); if (error_count > 0) { printf("\nОБЩИЙ СТАТУС: Завершено с ошибками\n"); return -1; } else { printf("\nОБЩИЙ СТАТУС: Все потоки завершены успешно\n"); return 0; } }