From 0334b9a42b9e5f292cfbf9674ab594beae635026 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9C=D0=B0=D1=80?= =?UTF-8?q?=D0=B8=D0=BD=D0=BA=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 4 Dec 2025 13:25:48 +0700 Subject: [PATCH] fup zeroes from pipes --- include/ipc/IpcDispatcher.h | 2 ++ include/ipc/IpcPipeChannel.h | 73 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/include/ipc/IpcDispatcher.h b/include/ipc/IpcDispatcher.h index 02b12c9..b506ef7 100644 --- a/include/ipc/IpcDispatcher.h +++ b/include/ipc/IpcDispatcher.h @@ -5,6 +5,8 @@ #include #include +#include +#include // Серверный диспетчер, который получает IpcMessage с канала, // декодирует его в RPC-вызов, вызывает RpcInvoker и шлёт ответ. diff --git a/include/ipc/IpcPipeChannel.h b/include/ipc/IpcPipeChannel.h index 34e92ed..acfd360 100644 --- a/include/ipc/IpcPipeChannel.h +++ b/include/ipc/IpcPipeChannel.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include // IPC‑канал поверх именованных pipe. // Инкапсулирует работу с файловыми дескрипторами и обмен сообщениями IpcMessage. @@ -18,6 +20,31 @@ public: // При этом логически читаем только из readPipe, а пишем только в writePipe. fdIn_ = ::open(readPipe, O_RDWR); fdOut_ = ::open(writePipe, O_RDWR); + + // Проверяем, что каналы открыты успешно + if (fdIn_ < 0) { + std::cerr << "Failed to open read pipe: " << readPipe << " (errno: " << errno << ")" << std::endl; + } + if (fdOut_ < 0) { + std::cerr << "Failed to open write pipe: " << writePipe << " (errno: " << errno << ")" << std::endl; + } + + // Очищаем канал для чтения от остаточных данных + // Это важно, чтобы избежать чтения старых сообщений при повторном открытии канала + if (fdIn_ >= 0) { + char buf[4096]; + // Устанавливаем неблокирующий режим для очистки + int flags = ::fcntl(fdIn_, F_GETFL); + ::fcntl(fdIn_, F_SETFL, flags | O_NONBLOCK); + + // Читаем все доступные данные (старые сообщения) + while (::read(fdIn_, buf, sizeof(buf)) > 0) { + // Просто выбрасываем старые данные + } + + // Возвращаем блокирующий режим + ::fcntl(fdIn_, F_SETFL, flags); + } } ~IpcPipeChannel() override { @@ -34,7 +61,7 @@ public: return; } auto raw = msg.serialize(); // тип определяется сериализатором - + // Для текстовых форматов (std::string): if constexpr (std::is_same_v) { const std::string& data = raw; @@ -43,21 +70,51 @@ public: } // Можно добавить другие форматы по мере необходимости } - + IpcMessage receive() override { if (fdIn_ < 0) { return IpcMessage{}; } - + // Для текстовых форматов: if constexpr (std::is_same_v) { - char buf[4096]; - const int n = ::read(fdIn_, buf, sizeof(buf) - 1); - if (n <= 0) { + std::string line; + char c; + + // Читаем построчно до символа новой строки + while (true) { + const int n = ::read(fdIn_, &c, 1); + if (n < 0) { + // Ошибка чтения + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Неблокирующий режим и нет данных - это нормально, возвращаем пустое сообщение + return IpcMessage{}; + } + // Другая ошибка - логируем и возвращаем пустое сообщение + std::cerr << "read() error: " << errno << std::endl; + return IpcMessage{}; + } + if (n == 0) { + // EOF - канал закрыт на стороне записи или нет открытых дескрипторов записи + // Для именованных каналов это может означать, что клиент еще не открыл канал для записи + // В этом случае read() должен блокироваться, но если канал открыт в O_RDWR, + // то read() может вернуть 0 немедленно + // Если мы уже прочитали что-то, возвращаем это, иначе пустое сообщение + if (line.empty()) { + return IpcMessage{}; + } + break; // EOF, но есть данные - возвращаем их + } + if (c == '\n') { + break; // Достигли конца строки + } + line += c; + } + + if (line.empty()) { return IpcMessage{}; } - buf[n] = 0; - return IpcMessage(std::string(buf)); + return IpcMessage(line); } // Можно добавить другие форматы по мере необходимости return IpcMessage{};