Files
CS-LABS/mine/lab_7/threads_var12.c
2025-12-11 09:22:54 +07:00

302 lines
10 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Многопоточная обработка файлов с использованием POSIX threads (pthread).
// Для каждого входного файла создаётся отдельный поток.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <pthread.h> // POSIX-потоки: pthread_t, pthread_create, pthread_join, mutex
#define RBUFSZ 4096
#define WBUFSZ 4096
#define MAX_THREADS 100
// Параметры, которые главный поток передаёт в поток pthread_create.
typedef struct {
const char *input_path;
const char *output_path;
long long max_repl;
int thread_index;
int result; // Код завершения работы потока (заполняется самим потоком).
} ThreadTask;
// Глобальный мьютекс для синхронизации вывода из нескольких потоков.
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
// Выводит сообщение об ошибке, защищая вывод мьютексом (чтобы строки не перемешивались).
static void die_perror_thread(const char *what,
const char *path,
int exit_code) {
int saved = errno;
char msg[512];
if (path) {
snprintf(msg, sizeof(msg),
"%s: %s: %s\n", what, path, strerror(saved));
} else {
snprintf(msg, sizeof(msg),
"%s: %s\n", what, strerror(saved));
}
pthread_mutex_lock(&log_mutex);
write(STDERR_FILENO, msg, strlen(msg));
pthread_mutex_unlock(&log_mutex);
(void) exit_code;
}
// Разбор положительного long long из строки.
static long long parse_ll(const char *s) {
char *end = NULL;
errno = 0;
long long v = strtoll(s, &end, 10);
if (errno != 0 || end == s || *end != '\0' || v < 0) {
errno = EINVAL;
return -1;
}
return v;
}
// Однопоточная обработка одного файла (фактически «тело работы» для потока).
static int process_file_variant12(const char *in_path,
const char *out_path,
long long cap) {
int in_fd = open(in_path, O_RDONLY);
if (in_fd < 0) {
die_perror_thread("open input failed", in_path, -1);
return -1;
}
mode_t perms = S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP |
S_IROTH | S_IWOTH;
int out_fd = open(out_path,
O_CREAT | O_WRONLY | O_TRUNC,
perms);
if (out_fd < 0) {
die_perror_thread("open output failed", out_path, -1);
close(in_fd);
return -1;
}
char rbuf[RBUFSZ];
char wbuf[WBUFSZ];
size_t wlen = 0;
long long total_replacements = 0;
int at_line_start = 1;
unsigned char line_key = 0;
for (;;) {
ssize_t n = read(in_fd, rbuf, sizeof(rbuf));
if (n > 0) {
for (ssize_t i = 0; i < n; i++) {
unsigned char c = (unsigned char) rbuf[i];
if (at_line_start) {
line_key = c;
at_line_start = 0;
}
unsigned char outc = c;
if (c == '\n') {
at_line_start = 1;
} else if (c == line_key) {
if (total_replacements < cap) {
outc = ' ';
total_replacements++;
}
}
if (wlen == sizeof(wbuf)) {
ssize_t wrote = write(out_fd, wbuf, wlen);
if (wrote < 0 || (size_t) wrote != wlen) {
die_perror_thread("write failed", out_path, -1);
close(in_fd);
close(out_fd);
return -1;
}
wlen = 0;
}
wbuf[wlen++] = (char) outc;
}
} else if (n == 0) {
if (wlen > 0) {
ssize_t wrote = write(out_fd, wbuf, wlen);
if (wrote < 0 || (size_t) wrote != wlen) {
die_perror_thread("write failed", out_path, -1);
close(in_fd);
close(out_fd);
return -1;
}
wlen = 0;
}
break;
} else {
die_perror_thread("read failed", in_path, -1);
close(in_fd);
close(out_fd);
return -1;
}
}
if (close(in_fd) < 0) {
die_perror_thread("close input failed", in_path, -1);
close(out_fd);
return -1;
}
if (close(out_fd) < 0) {
die_perror_thread("close output failed", out_path, -1);
return -1;
}
// Печать результата тоже защищена мьютексом, чтобы несколько потоков
// не писали одновременно в stdout и строки не «рвали» друг друга.
pthread_mutex_lock(&log_mutex);
char res[64];
int m = snprintf(res, sizeof(res), "%lld\n", total_replacements);
if (m > 0) {
write(STDOUT_FILENO, res, (size_t) m);
}
pthread_mutex_unlock(&log_mutex);
return 0;
}
// Функция, которую будет выполнять каждый поток, созданный через pthread_create.
static void *thread_func(void *arg) {
ThreadTask *task = (ThreadTask *) arg;
pthread_mutex_lock(&log_mutex);
printf("[thread %d] start: %s -> %s, max_repl=%lld\n",
task->thread_index,
task->input_path,
task->output_path,
task->max_repl);
pthread_mutex_unlock(&log_mutex);
int rc = process_file_variant12(task->input_path,
task->output_path,
task->max_repl);
task->result = rc; // Поток записывает свой код завершения в структуру.
pthread_mutex_lock(&log_mutex);
printf("[thread %d] finished with code %d\n",
task->thread_index,
task->result);
pthread_mutex_unlock(&log_mutex);
return NULL; // Возврат из функции = завершение pthread.
}
static void print_usage(const char *progname) {
fprintf(stderr,
"Usage: %s <max_replacements> <input1> <output1> [<input2> <output2> ...]\n",
progname);
fprintf(stderr,
"Example: %s 100 in1.txt out1.txt in2.txt out2.txt\n",
progname);
}
// Главный поток: создаёт потоки pthread_create, затем ждёт их завершения через pthread_join.
int main(int argc, char *argv[]) {
if (argc < 4 || ((argc - 2) % 2) != 0) {
fprintf(stderr,
"ERROR: Недостаточное или неверное количество аргументов\n");
print_usage(argv[0]);
return -1;
}
long long cap = parse_ll(argv[1]);
if (cap < 0) {
die_perror_thread("invalid max_replacements", argv[1], -1);
return -1;
}
int num_files = (argc - 2) / 2;
if (num_files > MAX_THREADS) {
fprintf(stderr,
"ERROR: Слишком много файлов (максимум %d пар)\n",
MAX_THREADS);
return -1;
}
printf("=== Многопоточная обработка, вариант 12 ===\n");
printf("Главный поток TID: %lu\n", (unsigned long) pthread_self());
printf("Максимум замен на файл: %lld\n", cap);
printf("Количество файловых пар: %d\n\n", num_files);
pthread_t threads[MAX_THREADS]; // Идентификаторы POSIX-потоков.
ThreadTask tasks[MAX_THREADS]; // Массив задач, по одной на поток.
// Создаём по одному потоку на каждую пару input/output.
for (int i = 0; i < num_files; i++) {
const char *input_path = argv[2 + i * 2];
const char *output_path = argv[3 + i * 2];
tasks[i].input_path = input_path;
tasks[i].output_path = output_path;
tasks[i].max_repl = cap;
tasks[i].thread_index = i + 1;
tasks[i].result = -1;
int rc = pthread_create(&threads[i],
NULL, // атрибуты по умолчанию
thread_func, // функция, с которой стартует поток
&tasks[i]); // аргумент, передаваемый функции
if (rc != 0) {
errno = rc;
die_perror_thread("pthread_create failed", NULL, -1);
tasks[i].result = -1;
threads[i] = 0; // помечаем, что поток не был создан
}
}
int success_count = 0;
int error_count = 0;
// Ждём завершения всех созданных потоков.
for (int i = 0; i < num_files; i++) {
if (!threads[i]) {
error_count++;
continue;
}
int rc = pthread_join(threads[i], NULL);
if (rc != 0) {
errno = rc;
die_perror_thread("pthread_join failed", NULL, -1);
error_count++;
continue;
}
if (tasks[i].result == 0) {
success_count++;
} else {
error_count++;
}
}
printf("\n=== Итоговая статистика ===\n");
printf("Всего потоков: %d\n", num_files);
printf("Успешно завершено: %d\n", success_count);
printf("С ошибкой: %d\n", error_count);
if (error_count > 0) {
printf("\nОБЩИЙ СТАТУС: Завершено с ошибками\n");
return -1;
} else {
printf("\nОБЩИЙ СТАТУС: Все потоки завершены успешно\n");
return 0;
}
}