2505C++,py和go调用雅兰亭库的协程工具
原文
神算
调用C++
一般调用pybind11
封装的C++
库实现神算
调用C++
库,pybind11
封装c++
的接口很简单
.
创建一个py_example.cpp
文件
#include <pybind11/pybind11.h>
#include <string>
namespace py = pybind11;
PYBIND11_MODULE(py_example, m) {m.def("hello", [] {return std::string("hello");});
}
通过cmake
去编译成动态库
:
pybind11_add_module(py_example py_example/py_example.cpp)
神算
调用pybind11
封装的你好
函数:
import py_example
print(py_example.hello())
通过pybind11
,几行代码
就可实现神算
调用c++
了.
如果要同步
调用coro_http_client
还是很简单
的,封装一个函数就完了.
但是如果要异步调用
,事情就不是则简单了,coro_http
和coro_rpc
都是协程
异步的,如何把C++``协程
和神算
的协程
结合起来却不容易,当前通过回调函数
和神算
的asynio
的未来
来实现的.
每次调用C++
的协程
函数之前创建一个未来
和设置一个回调函数
,当协程
返回时设置未来
,神算
侧则等待
该未来
.
以神算
调用coro_http_client
为例:
#include <pybind11/pybind11.h>
#include <string>
#include <ylt/coro_http/coro_http_client.hpp>
#include <ylt/coro_io/client_pool.hpp>
namespace py = pybind11;
class caller {public:caller(py::function callback) : callback_(std::move(callback)) {}void async_get(std::string url) {static auto pool =coro_io::client_pool<coro_http::coro_http_client>::create(url);pool->send_request([this, url](coro_http::coro_http_client &client) -> async_simple::coro::Lazy<void> {auto result = co_await client.async_get(url);py::gil_scoped_acquire acquire;callback_(result.resp_body);}).start([](auto &&) {});}private:py::function callback_;
};PYBIND11_MODULE(py_example, m) {py::class_<caller>(m, "caller").def(py::init<py::function>()).def("async_get", &caller::async_get,py::call_guard<py::gil_scoped_release>());
}
pybind11
不支持封装一个协程
函数,因此需要通过一个普通函数
去调用协程
,即使用回调方式
调用协程
,在协程
的回调函数
中设置神算
的未来
.
通过一个调用者
类来实现调用C++``协程
.
创建调用者
类时传入一个神算
的回调函数
,在async_get
方法中通过回调方式
调用client_pool
中的send_request``协程
,并在其回调函数
中co_await
客户的请求,请求结束
后设置回调函数
.
注意的是神算
的GIL
锁,调用回调函数
时需要加锁.
为何需要一个调用者
对象来保存回调函数
,而不是按函数的参数
传入呢回调函数
?试过,但因为回调
函数的生命期的问题,不好解决,在内部的λ
中移动
或拷贝它,总会出现pybind11
对象释放的错误.
在调用者
里放置,由神算
去管理调用者
和回调
的生命期
就没问题了.
神算
侧,通过协程
去调用coro_http_client
就有点麻烦
了,需要创建一个未来
并设置调用者
的回调函数
,并在回调函数
中设置未来
的值:
import asyncio
import py_example
async def async_get(url):loop = asyncio.get_running_loop()future = loop.create_future()def cpp_callback(message):def set_result():future.set_result(message)loop.call_soon_threadsafe(set_result)caller = py_example.caller(cpp_callback)caller.async_get(url)result = await futureprint(result)async def main():await async_get("http://taobao.com")if __name__ == "__main__":asyncio.run(main())
虽然有点复杂,但工作了
,如果大家有更好的神算``协程
和C++20``协程
结合的方法也请留言告诉我.
go
调用C++
相比较而言,因为go
的协程
用起来比较方便,而且go
更期望调用的是同步接口
,因为go
总可在go
协程中执行同步方法
,因此用go
来封装coro_rpc``简单一些
.
先按c接口
封装coro_rpc
,然后通过cgo
去调用这些c接口
,等待异步函数
结束时,放进go
协程中即可.
封装coro_http_server
的c接口
创建和结束coro_http_server
extern void *start_rpc_server(char *addr, int parallel);
extern void stop_rpc_server(void *server);
注册rpc
函数和回调
coro_rpc_server
需要注册一个,需要根据用户自己的需求
来实现的rpc
函数,来提供rpc
服务,比如有一个加载
的rpc
服务:
inline void ylt_load_service(coro_rpc::context<void> context, uint64_t req_id) {//做`CPP`业务
}
在创建rpcserver
后,注册该rpc
函数:
void *start_rpc_server(char *addr, int parallel) {auto server = std::make_unique<coro_rpc::coro_rpc_server>( parallel, addr, std::chrono::seconds(600));server->register_handler<ylt_load_service>();//注册`rpc`函数auto res = server->async_start();if (res.hasResult()) {ELOG_ERROR << "start server failed";return nullptr;}return server.release();
}
这里注意,该rpc
函数需要转到go
那里去执行go
的业务逻辑
,而不是在c++
这一侧去写业务逻辑
,因此需要在go
中回调.
extern void load_service(void *ctx, uint64_t req_id);
inline void ylt_load_service(coro_rpc::context<void> context, uint64_t req_id) {auto ctx_ptr = std::make_unique<coro_rpc::context<void>>(std::move(context));load_service(ctx_ptr.release(), req_id);
}
注意该只有声明的load_service
,就是通过它回调到go
的,具体实现
在go
那里,所以这里只有个声明
,cgo
支持这样.
另外一个细节是,为何使用coro_rpc::context
呢?因为是rpc
函数是回调到go
那里处理的,go
那里何时处理完业务,c++
这边是不知道的,所以需要该环境
在go
那里处理完业务逻辑
之后发送rpc
响应.
go
实现的load_service
:
//导出`load_service`
func load_service(ctx unsafe.Pointer, req_id C.uint64_t) {//fmt.Println("load req_id", req_id);//go代码if(req_id == 2) {var err_msg = C.CString("error from server")C.response_error(ctx, 1001, err_msg)C.free(unsafe.Pointer(err_msg))return;}//响应`RPC`结果var promise = C.response_msg(ctx, ((*C.char)(unsafe.Pointer(&g_resp_buf[0]))), C.uint64_t(len(g_resp_buf)));go func(p unsafe.Pointer, buf []byte) {result := C.wait_response_finish(p);if(result.ec > 0) {fmt.Println(result.ec, C.GoString(result.err_msg))C.free(unsafe.Pointer(result.err_msg))}//现在释放`缓冲`}(promise, g_resp_buf)
}
因为response_msg
是异步发送消息
的,go
这边需要稍微等待一下,等发送完成
之后再清理发送数据
等资源.这里是通过一个go
协程来等待发送完成
.
封装coro_http_client
的c接口
extern void *create_client_pool(char *addr, int req_timeout_sec);
extern void free_client_pool(void *pool);
extern rpc_result load(void *pool, uint64_t req_id, char *dest, uint64_t dest_len);
客户端
的封装简单一些
,创建clientpool
,释放clientpool
,并调用rpcload
的接口.看一下调用的实现
:
rpc_result load(void *pool, uint64_t req_id, char *dest, uint64_t dest_len) {using namespace async_simple::coro;rpc_result ret{};auto ld = (coro_io::load_balancer<coro_rpc::coro_rpc_client> *)pool;auto lazy = [&]() -> Lazy<void> {auto result =co_await ld->send_request([&](coro_rpc::coro_rpc_client &client, std::string_view hostname) -> Lazy<void> {client.set_resp_attachment_buf(std::span<char>(dest, dest_len));auto result = co_await client.call<ylt_load_service>(req_id);if (!result) {set_rpc_result(ret, result.error());co_return;}if (!client.is_resp_attachment_in_external_buf()) {set_rpc_result(ret, coro_rpc::rpc_error(coro_rpc::errc::message_too_large));co_return;}ret.len = client.get_resp_attachment().size();});};syncAwait(lazy());return ret;
}
通过clientpool
去发送请求
,内部同步等待rpc
调用结束,go
那里则可在一个go
协程中放置该加载
调用.
这里展示一个如何实现rpc
请求零拷贝的场景
:
client.set_resp_attachment_buf(std::span<char>(dest, dest_len));
设置附件
是为了零拷贝
,附件
的内存可能是来自于go
那里的一块很大的内存
,并在附件
中放置,coro_rpc_client
发送请求时不会去序化它的,而是直接发送到服务器
.
go
那里调用加载
就比较简单
了:
func test_client(host string, len int) {peer := C.CString(host)pool := C.create_client_pool(peer, C.int(30))outbuf := make([]byte, len)for i := 0; i < 3; i++ {req_id := uint64(i)result := C.load(pool, C.uint64_t(req_id), ((*C.char)(unsafe.Pointer(&outbuf[0]))), C.uint64_t(len))if(result.ec > 0) {fmt.Println("error: ", result.ec, C.GoString(result.err_msg))C.free(unsafe.Pointer(result.err_msg))}else {fmt.Println("result: ", string(outbuf[0:result.len]), "len", result.len)}}C.free_client_pool(pool)C.free(unsafe.Pointer(peer))
}
test_client("0.0.0.0:8806", 1024)