fup zeroes from pipes

master
Сергей Маринкевич 2 months ago
parent 8284a36883
commit 0334b9a42b

@ -5,6 +5,8 @@
#include <rpc/RpcInvoker.h> #include <rpc/RpcInvoker.h>
#include <string> #include <string>
#include <iostream>
#include <unistd.h>
// Серверный диспетчер, который получает IpcMessage с канала, // Серверный диспетчер, который получает IpcMessage с канала,
// декодирует его в RPC-вызов, вызывает RpcInvoker и шлёт ответ. // декодирует его в RPC-вызов, вызывает RpcInvoker и шлёт ответ.

@ -5,6 +5,8 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include <iostream>
// IPCканал поверх именованных pipe. // IPCканал поверх именованных pipe.
// Инкапсулирует работу с файловыми дескрипторами и обмен сообщениями IpcMessage. // Инкапсулирует работу с файловыми дескрипторами и обмен сообщениями IpcMessage.
@ -18,6 +20,31 @@ public:
// При этом логически читаем только из readPipe, а пишем только в writePipe. // При этом логически читаем только из readPipe, а пишем только в writePipe.
fdIn_ = ::open(readPipe, O_RDWR); fdIn_ = ::open(readPipe, O_RDWR);
fdOut_ = ::open(writePipe, O_RDWR); fdOut_ = ::open(writePipe, O_RDWR);
// Проверяем, что каналы открыты успешно
if (fdIn_ < 0) {
std::cerr << "Failed to open read pipe: " << readPipe << " (errno: " << errno << ")" << std::endl;
}
if (fdOut_ < 0) {
std::cerr << "Failed to open write pipe: " << writePipe << " (errno: " << errno << ")" << std::endl;
}
// Очищаем канал для чтения от остаточных данных
// Это важно, чтобы избежать чтения старых сообщений при повторном открытии канала
if (fdIn_ >= 0) {
char buf[4096];
// Устанавливаем неблокирующий режим для очистки
int flags = ::fcntl(fdIn_, F_GETFL);
::fcntl(fdIn_, F_SETFL, flags | O_NONBLOCK);
// Читаем все доступные данные (старые сообщения)
while (::read(fdIn_, buf, sizeof(buf)) > 0) {
// Просто выбрасываем старые данные
}
// Возвращаем блокирующий режим
::fcntl(fdIn_, F_SETFL, flags);
}
} }
~IpcPipeChannel() override { ~IpcPipeChannel() override {
@ -34,7 +61,7 @@ public:
return; return;
} }
auto raw = msg.serialize(); // тип определяется сериализатором auto raw = msg.serialize(); // тип определяется сериализатором
// Для текстовых форматов (std::string): // Для текстовых форматов (std::string):
if constexpr (std::is_same_v<typename IpcMessage::RawData, std::string>) { if constexpr (std::is_same_v<typename IpcMessage::RawData, std::string>) {
const std::string& data = raw; const std::string& data = raw;
@ -43,21 +70,51 @@ public:
} }
// Можно добавить другие форматы по мере необходимости // Можно добавить другие форматы по мере необходимости
} }
IpcMessage receive() override { IpcMessage receive() override {
if (fdIn_ < 0) { if (fdIn_ < 0) {
return IpcMessage{}; return IpcMessage{};
} }
// Для текстовых форматов: // Для текстовых форматов:
if constexpr (std::is_same_v<typename IpcMessage::RawData, std::string>) { if constexpr (std::is_same_v<typename IpcMessage::RawData, std::string>) {
char buf[4096]; std::string line;
const int n = ::read(fdIn_, buf, sizeof(buf) - 1); char c;
if (n <= 0) {
// Читаем построчно до символа новой строки
while (true) {
const int n = ::read(fdIn_, &c, 1);
if (n < 0) {
// Ошибка чтения
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Неблокирующий режим и нет данных - это нормально, возвращаем пустое сообщение
return IpcMessage{};
}
// Другая ошибка - логируем и возвращаем пустое сообщение
std::cerr << "read() error: " << errno << std::endl;
return IpcMessage{};
}
if (n == 0) {
// EOF - канал закрыт на стороне записи или нет открытых дескрипторов записи
// Для именованных каналов это может означать, что клиент еще не открыл канал для записи
// В этом случае read() должен блокироваться, но если канал открыт в O_RDWR,
// то read() может вернуть 0 немедленно
// Если мы уже прочитали что-то, возвращаем это, иначе пустое сообщение
if (line.empty()) {
return IpcMessage{};
}
break; // EOF, но есть данные - возвращаем их
}
if (c == '\n') {
break; // Достигли конца строки
}
line += c;
}
if (line.empty()) {
return IpcMessage{}; return IpcMessage{};
} }
buf[n] = 0; return IpcMessage(line);
return IpcMessage(std::string(buf));
} }
// Можно добавить другие форматы по мере необходимости // Можно добавить другие форматы по мере необходимости
return IpcMessage{}; return IpcMessage{};

Loading…
Cancel
Save