// Многопоточная обработка файлов с использованием POSIX threads (pthread). // Для каждого входного файла создаётся отдельный поток. #include #include #include #include #include #include #include #include // POSIX-потоки: pthread_t, pthread_create, pthread_join, mutex #define RBUFSZ 4096 #define WBUFSZ 4096 #define MAX_THREADS 100 // Параметры, которые главный поток передаёт в поток pthread_create. typedef struct { const char *input_path; const char *output_path; long long max_repl; int thread_index; int result; // Код завершения работы потока (заполняется самим потоком). } ThreadTask; // Глобальный мьютекс для синхронизации вывода из нескольких потоков. static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; // Выводит сообщение об ошибке, защищая вывод мьютексом (чтобы строки не перемешивались). static void die_perror_thread(const char *what, const char *path, int exit_code) { int saved = errno; char msg[512]; if (path) { snprintf(msg, sizeof(msg), "%s: %s: %s\n", what, path, strerror(saved)); } else { snprintf(msg, sizeof(msg), "%s: %s\n", what, strerror(saved)); } pthread_mutex_lock(&log_mutex); write(STDERR_FILENO, msg, strlen(msg)); pthread_mutex_unlock(&log_mutex); (void) exit_code; } // Разбор положительного long long из строки. static long long parse_ll(const char *s) { char *end = NULL; errno = 0; long long v = strtoll(s, &end, 10); if (errno != 0 || end == s || *end != '\0' || v < 0) { errno = EINVAL; return -1; } return v; } // Однопоточная обработка одного файла (фактически «тело работы» для потока). static int process_file_variant12(const char *in_path, const char *out_path, long long cap) { int in_fd = open(in_path, O_RDONLY); if (in_fd < 0) { die_perror_thread("open input failed", in_path, -1); return -1; } mode_t perms = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; int out_fd = open(out_path, O_CREAT | O_WRONLY | O_TRUNC, perms); if (out_fd < 0) { die_perror_thread("open output failed", out_path, -1); close(in_fd); return -1; } char rbuf[RBUFSZ]; char wbuf[WBUFSZ]; size_t wlen = 0; long long total_replacements = 0; int at_line_start = 1; unsigned char line_key = 0; for (;;) { ssize_t n = read(in_fd, rbuf, sizeof(rbuf)); if (n > 0) { for (ssize_t i = 0; i < n; i++) { unsigned char c = (unsigned char) rbuf[i]; if (at_line_start) { line_key = c; at_line_start = 0; } unsigned char outc = c; if (c == '\n') { at_line_start = 1; } else if (c == line_key) { if (total_replacements < cap) { outc = ' '; total_replacements++; } } if (wlen == sizeof(wbuf)) { ssize_t wrote = write(out_fd, wbuf, wlen); if (wrote < 0 || (size_t) wrote != wlen) { die_perror_thread("write failed", out_path, -1); close(in_fd); close(out_fd); return -1; } wlen = 0; } wbuf[wlen++] = (char) outc; } } else if (n == 0) { if (wlen > 0) { ssize_t wrote = write(out_fd, wbuf, wlen); if (wrote < 0 || (size_t) wrote != wlen) { die_perror_thread("write failed", out_path, -1); close(in_fd); close(out_fd); return -1; } wlen = 0; } break; } else { die_perror_thread("read failed", in_path, -1); close(in_fd); close(out_fd); return -1; } } if (close(in_fd) < 0) { die_perror_thread("close input failed", in_path, -1); close(out_fd); return -1; } if (close(out_fd) < 0) { die_perror_thread("close output failed", out_path, -1); return -1; } // Печать результата тоже защищена мьютексом, чтобы несколько потоков // не писали одновременно в stdout и строки не «рвали» друг друга. pthread_mutex_lock(&log_mutex); char res[64]; int m = snprintf(res, sizeof(res), "%lld\n", total_replacements); if (m > 0) { write(STDOUT_FILENO, res, (size_t) m); } pthread_mutex_unlock(&log_mutex); return 0; } // Функция, которую будет выполнять каждый поток, созданный через pthread_create. static void *thread_func(void *arg) { ThreadTask *task = (ThreadTask *) arg; pthread_mutex_lock(&log_mutex); printf("[thread %d] start: %s -> %s, max_repl=%lld\n", task->thread_index, task->input_path, task->output_path, task->max_repl); pthread_mutex_unlock(&log_mutex); int rc = process_file_variant12(task->input_path, task->output_path, task->max_repl); task->result = rc; // Поток записывает свой код завершения в структуру. pthread_mutex_lock(&log_mutex); printf("[thread %d] finished with code %d\n", task->thread_index, task->result); pthread_mutex_unlock(&log_mutex); return NULL; // Возврат из функции = завершение pthread. } static void print_usage(const char *progname) { fprintf(stderr, "Usage: %s [ ...]\n", progname); fprintf(stderr, "Example: %s 100 in1.txt out1.txt in2.txt out2.txt\n", progname); } // Главный поток: создаёт потоки pthread_create, затем ждёт их завершения через pthread_join. int main(int argc, char *argv[]) { if (argc < 4 || ((argc - 2) % 2) != 0) { fprintf(stderr, "ERROR: Недостаточное или неверное количество аргументов\n"); print_usage(argv[0]); return -1; } long long cap = parse_ll(argv[1]); if (cap < 0) { die_perror_thread("invalid max_replacements", argv[1], -1); return -1; } int num_files = (argc - 2) / 2; if (num_files > MAX_THREADS) { fprintf(stderr, "ERROR: Слишком много файлов (максимум %d пар)\n", MAX_THREADS); return -1; } printf("=== Многопоточная обработка, вариант 12 ===\n"); printf("Главный поток TID: %lu\n", (unsigned long) pthread_self()); printf("Максимум замен на файл: %lld\n", cap); printf("Количество файловых пар: %d\n\n", num_files); pthread_t threads[MAX_THREADS]; // Идентификаторы POSIX-потоков. ThreadTask tasks[MAX_THREADS]; // Массив задач, по одной на поток. // Создаём по одному потоку на каждую пару input/output. for (int i = 0; i < num_files; i++) { const char *input_path = argv[2 + i * 2]; const char *output_path = argv[3 + i * 2]; tasks[i].input_path = input_path; tasks[i].output_path = output_path; tasks[i].max_repl = cap; tasks[i].thread_index = i + 1; tasks[i].result = -1; int rc = pthread_create(&threads[i], NULL, // атрибуты по умолчанию thread_func, // функция, с которой стартует поток &tasks[i]); // аргумент, передаваемый функции if (rc != 0) { errno = rc; die_perror_thread("pthread_create failed", NULL, -1); tasks[i].result = -1; threads[i] = 0; // помечаем, что поток не был создан } } int success_count = 0; int error_count = 0; // Ждём завершения всех созданных потоков. for (int i = 0; i < num_files; i++) { if (!threads[i]) { error_count++; continue; } int rc = pthread_join(threads[i], NULL); if (rc != 0) { errno = rc; die_perror_thread("pthread_join failed", NULL, -1); error_count++; continue; } if (tasks[i].result == 0) { success_count++; } else { error_count++; } } printf("\n=== Итоговая статистика ===\n"); printf("Всего потоков: %d\n", num_files); printf("Успешно завершено: %d\n", success_count); printf("С ошибкой: %d\n", error_count); if (error_count > 0) { printf("\nОБЩИЙ СТАТУС: Завершено с ошибками\n"); return -1; } else { printf("\nОБЩИЙ СТАТУС: Все потоки завершены успешно\n"); return 0; } }