Compare commits

...

2 Commits

Author SHA1 Message Date
Сергей Маринкевич 43f67275e2 factor IPC out of server RPC 2 weeks ago
Сергей Маринкевич 00d359a064 Fix IpcMessage::empty so skeleton stops reading args and server replies 2 weeks ago

@ -0,0 +1,61 @@
#pragma once
#include <ipc/IpcMessage.h>
#include <rpc/RpcValue.h>
#include <string>
// Кодек, который знает, как упаковать/распаковать RPC-запросы/ответы
// в/IpcMessage. Живёт в IPC-слое, но опирается на типы RPC-ядра
// (RpcValue/RpcArgs).
namespace IpcCodec {
// Запрос: имя метода + вектор аргументов.
inline IpcMessage encodeRequest(const std::string& method,
const RpcArgs& args) {
IpcMessage msg;
// имя метода
msg.add(method);
// аргументы (PoC: только int)
for (const auto& a : args) {
msg.add(a.asInt());
}
return msg;
}
inline void decodeRequest(const IpcMessage& msg,
std::string& method,
RpcArgs& args) {
IpcMessage copy = msg;
// имя метода
method = copy.get<std::string>();
// аргументы (PoC: только int, читаем до конца сообщения)
args.clear();
while (!copy.empty()) {
int v = copy.get<int>();
args.emplace_back(RpcValue::fromInt(v));
}
}
// Ответ: одно RpcValue (PoC: считаем, что это int).
inline IpcMessage encodeResponse(const RpcValue& result) {
IpcMessage msg;
msg.add(result.asInt()); // PoC: только int
return msg;
}
inline RpcValue decodeResponse(const IpcMessage& msg) {
IpcMessage copy = msg;
int v = copy.get<int>();
return RpcValue::fromInt(v);
}
} // namespace IpcCodec

@ -0,0 +1,47 @@
#pragma once
#include <ipc/IpcChannel.h>
#include <ipc/IpcCodec.h>
#include <rpc/RpcInvoker.h>
#include <string>
// Серверный диспетчер, который получает IpcMessage с канала,
// декодирует его в RPC-вызов, вызывает RpcInvoker и шлёт ответ.
class IpcDispatcher {
public:
IpcDispatcher(IpcChannel& ch, RpcInvoker& invoker)
: channel_(ch)
, invoker_(invoker) {}
// Обработать один запрос. Возвращает false, если получили "пустое"
// сообщение и цикл стоит завершить.
bool handleOnce() {
IpcMessage req = channel_.receive();
if (req.empty()) {
return false;
}
std::string method;
RpcArgs args;
IpcCodec::decodeRequest(req, method, args);
RpcValue result = invoker_.dispatch(method, args);
IpcMessage resp = IpcCodec::encodeResponse(result);
channel_.send(resp);
return true;
}
// Простой цикл обработки до тех пор, пока канал не вернёт пустое сообщение.
void loop() {
while (handleOnce()) {
}
}
private:
IpcChannel& channel_;
RpcInvoker& invoker_;
};

@ -39,16 +39,27 @@ public:
} }
bool empty() const { bool empty() const {
return raw_.empty(); // Для входящих сообщений считаем "пустым" то, у которого
// больше не осталось непрочитанных данных во входном потоке.
//
// Для свежесозданного сообщения (ещё не инициализирован in_)
// поведение остаётся прежним: пусто == raw_.empty().
if (!in_initialized_) {
return raw_.empty();
}
// Если поток уже инициализирован, смотрим, остались ли данные.
// peek() вернёт EOF, когда всё прочитано.
return in_.peek() == EOF;
} }
private: private:
std::string raw_; std::string raw_;
std::ostringstream out_; std::ostringstream out_;
std::istringstream in_; mutable std::istringstream in_;
bool in_initialized_{false}; mutable bool in_initialized_{false};
void ensureInput() { void ensureInput() const {
if (!in_initialized_) { if (!in_initialized_) {
in_.str(raw_); in_.str(raw_);
in_initialized_ = true; in_initialized_ = true;

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <ipc/IpcChannel.h> #include <ipc/IpcChannel.h>
#include <ipc/IpcCodec.h>
#include <rpc/RpcValue.h> #include <rpc/RpcValue.h>
class ProxyMarshaller { class ProxyMarshaller {
@ -9,22 +10,15 @@ public:
// Базовый type-erased вызов: принимает вектор RpcValue и возвращает RpcValue. // Базовый type-erased вызов: принимает вектор RpcValue и возвращает RpcValue.
RpcValue call(const std::string& method, const RpcArgs& args) { RpcValue call(const std::string& method, const RpcArgs& args) {
IpcMessage msg; // упаковать запрос в IpcMessage
IpcMessage msg = IpcCodec::encodeRequest(method, args);
// имя метода
msg.add(method);
// аргументы (PoC: только int)
for (const auto& a : args) {
msg.add(a.asInt());
}
// отправить // отправить
channel.send(msg); channel.send(msg);
// получить ответ // получить ответ и распаковать
IpcMessage resp = channel.receive(); IpcMessage resp = channel.receive();
return RpcValue::fromInt(resp.get<int>()); return IpcCodec::decodeResponse(resp);
} }
// Удобный шаблонный хелпер для сгенерированных прокси. // Удобный шаблонный хелпер для сгенерированных прокси.

@ -2,6 +2,8 @@
#include "MyService.skeleton.h" #include "MyService.skeleton.h"
#include "ipc/IpcPipeChannel.h" #include "ipc/IpcPipeChannel.h"
#include "ipc/IpcDispatcher.h"
#include "rpc/RpcInvoker.h"
#include <sys/stat.h> #include <sys/stat.h>
@ -14,17 +16,13 @@ int main() {
// Сервер читает из fifo_to_server и пишет в fifo_to_client. // Сервер читает из fifo_to_server и пишет в fifo_to_client.
IpcPipeChannel ch("/tmp/fifo_to_server", "/tmp/fifo_to_client"); IpcPipeChannel ch("/tmp/fifo_to_server", "/tmp/fifo_to_client");
// RPCуровень: скелет поверх того же канала. // RPCуровень: инвокер и скелет, который лишь регистрирует методы.
RpcInvoker invoker;
MyService realObj; MyService realObj;
MyServiceSkeleton skeleton(realObj); MyServiceSkeleton skeleton(realObj, invoker);
while (true) { // IPCдиспетчер, который декодирует IpcMessage в RPC-вызовы и обратно.
IpcMessage req = ch.receive(); IpcDispatcher dispatcher(ch, invoker);
if (req.empty()) { dispatcher.loop();
break;
}
IpcMessage resp = skeleton.dispatch(req);
ch.send(resp);
}
} }

@ -1,7 +1,7 @@
#include "{{ cls.name }}.skeleton.h" #include "{{ cls.name }}.skeleton.h"
{{ cls.name }}Skeleton::{{ cls.name }}Skeleton({{ cls.name }}& obj) {{ cls.name }}Skeleton::{{ cls.name }}Skeleton({{ cls.name }}& obj, RpcInvoker& inv)
{ : invoker(inv) {
{% for m in cls.methods %} {% for m in cls.methods %}
invoker.registerMethod(&obj, invoker.registerMethod(&obj,
"{{ cls.name }}.{{ m.name }}", "{{ cls.name }}.{{ m.name }}",
@ -9,24 +9,8 @@
{% endfor %} {% endfor %}
} }
IpcMessage {{ cls.name }}Skeleton::dispatch(const IpcMessage& req) { RpcValue {{ cls.name }}Skeleton::dispatch(const std::string& method,
// Перепаковываем IpcMessage в RpcArgs и вызываем type-erased инвокер. const RpcArgs& args) {
IpcMessage msg = req; return invoker.dispatch(method, args);
// имя метода
std::string method = msg.get<std::string>();
// аргументы (PoC: только int, читаем все до конца сообщения)
RpcArgs args;
while (!msg.empty()) {
int v = msg.get<int>();
args.emplace_back(RpcValue::fromInt(v));
}
RpcValue result = invoker.dispatch(method, args);
IpcMessage resp;
resp.add(result.asInt()); // PoC: только int
return resp;
} }

@ -2,14 +2,14 @@
#include "{{ cls.name }}.h" #include "{{ cls.name }}.h"
#include "rpc/RpcInvoker.h" #include "rpc/RpcInvoker.h"
#include "ipc/IpcMessage.h"
class {{ cls.name }}Skeleton { class {{ cls.name }}Skeleton {
public: public:
explicit {{ cls.name }}Skeleton({{ cls.name }}& obj); explicit {{ cls.name }}Skeleton({{ cls.name }}& obj, RpcInvoker& invoker);
IpcMessage dispatch(const IpcMessage& req); // IPC-независимый диспетчер: принимает имя метода и RpcArgs.
RpcValue dispatch(const std::string& method, const RpcArgs& args);
private: private:
RpcInvoker invoker; RpcInvoker& invoker;
}; };

Loading…
Cancel
Save