You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
5.5 KiB
C++

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#pragma once
#include "IpcChannel.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <iostream>
// IPCканал поверх именованных pipe.
// Инкапсулирует работу с файловыми дескрипторами и обмен сообщениями IpcMessage.
// readPipe — тот FIFO, который этот endpoint читает; writePipe — тот, в который пишет.
class IpcPipeChannel : public IpcChannel {
public:
IpcPipeChannel(const char* readPipe, const char* writePipe) {
// Канал не создаёт FIFO, только открывает.
// Открываем оба конца как O_RDWR, чтобы избежать блокировок на open(O_RDONLY/O_WRONLY).
// При этом логически читаем только из readPipe, а пишем только в writePipe.
fdIn_ = ::open(readPipe, O_RDWR);
fdOut_ = ::open(writePipe, O_RDWR);
// Проверяем, что каналы открыты успешно
if (fdIn_ < 0) {
std::cerr << "Failed to open read pipe: " << readPipe << " (errno: " << errno << ")" << std::endl;
}
if (fdOut_ < 0) {
std::cerr << "Failed to open write pipe: " << writePipe << " (errno: " << errno << ")" << std::endl;
}
// Очищаем канал для чтения от остаточных данных
// Это важно, чтобы избежать чтения старых сообщений при повторном открытии канала
if (fdIn_ >= 0) {
char buf[4096];
// Устанавливаем неблокирующий режим для очистки
int flags = ::fcntl(fdIn_, F_GETFL);
::fcntl(fdIn_, F_SETFL, flags | O_NONBLOCK);
// Читаем все доступные данные (старые сообщения)
while (::read(fdIn_, buf, sizeof(buf)) > 0) {
// Просто выбрасываем старые данные
}
// Возвращаем блокирующий режим
::fcntl(fdIn_, F_SETFL, flags);
}
}
~IpcPipeChannel() override {
if (fdIn_ >= 0) {
::close(fdIn_);
}
if (fdOut_ >= 0) {
::close(fdOut_);
}
}
void send(const IpcMessage& msg) override {
if (fdOut_ < 0) {
return;
}
auto raw = msg.serialize(); // тип определяется сериализатором
// Для текстовых форматов (std::string):
if constexpr (std::is_same_v<typename IpcMessage::RawData, std::string>) {
const std::string& data = raw;
::write(fdOut_, data.c_str(), data.size());
::write(fdOut_, "\n", 1);
}
// Можно добавить другие форматы по мере необходимости
}
IpcMessage receive() override {
if (fdIn_ < 0) {
return IpcMessage{};
}
// Для текстовых форматов:
if constexpr (std::is_same_v<typename IpcMessage::RawData, std::string>) {
std::string line;
char c;
// Читаем построчно до символа новой строки
while (true) {
const int n = ::read(fdIn_, &c, 1);
if (n < 0) {
// Ошибка чтения
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Неблокирующий режим и нет данных - это нормально, возвращаем пустое сообщение
return IpcMessage{};
}
// Другая ошибка - логируем и возвращаем пустое сообщение
std::cerr << "read() error: " << errno << std::endl;
return IpcMessage{};
}
if (n == 0) {
// EOF - канал закрыт на стороне записи или нет открытых дескрипторов записи
// Для именованных каналов это может означать, что клиент еще не открыл канал для записи
// В этом случае read() должен блокироваться, но если канал открыт в O_RDWR,
// то read() может вернуть 0 немедленно
// Если мы уже прочитали что-то, возвращаем это, иначе пустое сообщение
if (line.empty()) {
return IpcMessage{};
}
break; // EOF, но есть данные - возвращаем их
}
if (c == '\n') {
break; // Достигли конца строки
}
line += c;
}
if (line.empty()) {
return IpcMessage{};
}
return IpcMessage(line);
}
// Можно добавить другие форматы по мере необходимости
return IpcMessage{};
}
private:
int fdIn_{-1};
int fdOut_{-1};
};