当前位置: 首页 > web >正文

2505C++,py和go调用雅兰亭库的协程工具

原文

神算调用C++

一般调用pybind11封装的C++库实现神算调用C++库,pybind11封装c++的接口很简单.

创建一个py_example.cpp文件

#include <pybind11/pybind11.h>
#include <string>
namespace py = pybind11;
PYBIND11_MODULE(py_example, m) {m.def("hello", [] {return std::string("hello");});
}

通过cmake去编译成动态库:

pybind11_add_module(py_example py_example/py_example.cpp)

神算调用pybind11封装的你好函数:

import py_example
print(py_example.hello())

通过pybind11,几行代码就可实现神算调用c++了.
如果要同步调用coro_http_client还是很简单的,封装一个函数就完了.
但是如果要异步调用,事情就不是则简单了,coro_httpcoro_rpc都是协程异步的,如何把C++``协程神算协程结合起来却不容易,当前通过回调函数神算asynio未来来实现的.

每次调用C++协程函数之前创建一个未来和设置一个回调函数,当协程返回时设置未来,神算侧则等待未来.
神算调用coro_http_client为例:

#include <pybind11/pybind11.h>
#include <string>
#include <ylt/coro_http/coro_http_client.hpp>
#include <ylt/coro_io/client_pool.hpp>
namespace py = pybind11;
class caller {public:caller(py::function callback) : callback_(std::move(callback)) {}void async_get(std::string url) {static auto pool =coro_io::client_pool<coro_http::coro_http_client>::create(url);pool->send_request([this, url](coro_http::coro_http_client &client) -> async_simple::coro::Lazy<void> {auto result = co_await client.async_get(url);py::gil_scoped_acquire acquire;callback_(result.resp_body);}).start([](auto &&) {});}private:py::function callback_;
};PYBIND11_MODULE(py_example, m) {py::class_<caller>(m, "caller").def(py::init<py::function>()).def("async_get", &caller::async_get,py::call_guard<py::gil_scoped_release>());
}

pybind11不支持封装一个协程函数,因此需要通过一个普通函数去调用协程,即使用回调方式调用协程,在协程回调函数中设置神算未来.
通过一个调用者类来实现调用C++``协程.

创建调用者类时传入一个神算回调函数,在async_get方法中通过回调方式调用client_pool中的send_request``协程,并在其回调函数co_await客户的请求,请求结束后设置回调函数.

注意的是神算GIL锁,调用回调函数时需要加锁.

为何需要一个调用者对象来保存回调函数,而不是按函数的参数传入呢回调函数?试过,但因为回调函数的生命期的问题,不好解决,在内部的λ移动或拷贝它,总会出现pybind11对象释放的错误.
调用者里放置,由神算去管理调用者回调生命期就没问题了.

神算侧,通过协程去调用coro_http_client有点麻烦了,需要创建一个未来并设置调用者回调函数,并在回调函数中设置未来的值:

import asyncio
import py_example
async def async_get(url):loop = asyncio.get_running_loop()future = loop.create_future()def cpp_callback(message):def set_result():future.set_result(message)loop.call_soon_threadsafe(set_result)caller = py_example.caller(cpp_callback)caller.async_get(url)result = await futureprint(result)async def main():await async_get("http://taobao.com")if __name__ == "__main__":asyncio.run(main())

虽然有点复杂,但工作了,如果大家有更好的神算``协程C++20``协程结合的方法也请留言告诉我.

go调用C++

相比较而言,因为go协程用起来比较方便,而且go更期望调用的是同步接口,因为go总可在go协程中执行同步方法,因此用go来封装coro_rpc``简单一些.

先按c接口封装coro_rpc,然后通过cgo去调用这些c接口,等待异步函数结束时,放进go协程中即可.

封装coro_http_server的c接口

创建和结束coro_http_server

extern void *start_rpc_server(char *addr, int parallel);
extern void stop_rpc_server(void *server);

注册rpc函数和回调

coro_rpc_server需要注册一个,需要根据用户自己的需求来实现的rpc函数,来提供rpc服务,比如有一个加载rpc服务:

inline void ylt_load_service(coro_rpc::context<void> context, uint64_t req_id) {//做`CPP`业务
}

在创建rpcserver后,注册该rpc函数:

void *start_rpc_server(char *addr, int parallel) {auto server = std::make_unique<coro_rpc::coro_rpc_server>( parallel, addr, std::chrono::seconds(600));server->register_handler<ylt_load_service>();//注册`rpc`函数auto res = server->async_start();if (res.hasResult()) {ELOG_ERROR << "start server failed";return nullptr;}return server.release();
}

这里注意,该rpc函数需要转到go那里去执行go业务逻辑,而不是在c++这一侧去写业务逻辑,因此需要在go中回调.

extern void load_service(void *ctx, uint64_t req_id);
inline void ylt_load_service(coro_rpc::context<void> context, uint64_t req_id) {auto ctx_ptr = std::make_unique<coro_rpc::context<void>>(std::move(context));load_service(ctx_ptr.release(), req_id);
}

注意该只有声明的load_service,就是通过它回调到go的,具体实现go那里,所以这里只有个声明,cgo支持这样.

另外一个细节是,为何使用coro_rpc::context呢?因为是rpc函数是回调到go那里处理的,go那里何时处理完业务,c++这边是不知道的,所以需要该环境go那里处理完业务逻辑之后发送rpc响应.

go实现的load_service:

    //导出`load_service`
func load_service(ctx unsafe.Pointer, req_id C.uint64_t) {//fmt.Println("load req_id", req_id);//go代码if(req_id == 2) {var err_msg = C.CString("error from server")C.response_error(ctx, 1001, err_msg)C.free(unsafe.Pointer(err_msg))return;}//响应`RPC`结果var promise = C.response_msg(ctx, ((*C.char)(unsafe.Pointer(&g_resp_buf[0]))), C.uint64_t(len(g_resp_buf)));go func(p unsafe.Pointer, buf []byte) {result := C.wait_response_finish(p);if(result.ec > 0) {fmt.Println(result.ec, C.GoString(result.err_msg))C.free(unsafe.Pointer(result.err_msg))}//现在释放`缓冲`}(promise, g_resp_buf)
}

因为response_msg异步发送消息的,go这边需要稍微等待一下,等发送完成之后再清理发送数据等资源.这里是通过一个go协程来等待发送完成.

封装coro_http_client的c接口

extern void *create_client_pool(char *addr, int req_timeout_sec);
extern void free_client_pool(void *pool);
extern rpc_result load(void *pool, uint64_t req_id, char *dest, uint64_t dest_len);

客户端的封装简单一些,创建clientpool,释放clientpool,并调用rpcload的接口.看一下调用的实现:

rpc_result load(void *pool, uint64_t req_id, char *dest, uint64_t dest_len) {using namespace async_simple::coro;rpc_result ret{};auto ld = (coro_io::load_balancer<coro_rpc::coro_rpc_client> *)pool;auto lazy = [&]() -> Lazy<void> {auto result =co_await ld->send_request([&](coro_rpc::coro_rpc_client &client, std::string_view hostname) -> Lazy<void> {client.set_resp_attachment_buf(std::span<char>(dest, dest_len));auto result = co_await client.call<ylt_load_service>(req_id);if (!result) {set_rpc_result(ret, result.error());co_return;}if (!client.is_resp_attachment_in_external_buf()) {set_rpc_result(ret, coro_rpc::rpc_error(coro_rpc::errc::message_too_large));co_return;}ret.len = client.get_resp_attachment().size();});};syncAwait(lazy());return ret;
}

通过clientpool发送请求,内部同步等待rpc调用结束,go那里则可在一个go协程中放置该加载调用.

这里展示一个如何实现rpc请求零拷贝的场景:

client.set_resp_attachment_buf(std::span<char>(dest, dest_len));

设置附件是为了零拷贝,附件的内存可能是来自于go那里的一块很大的内存,并在附件中放置,coro_rpc_client发送请求时不会去序化它的,而是直接发送到服务器.

go那里调用加载比较简单了:

func test_client(host string, len int) {peer := C.CString(host)pool := C.create_client_pool(peer, C.int(30))outbuf := make([]byte, len)for i := 0; i < 3; i++ {req_id := uint64(i)result := C.load(pool, C.uint64_t(req_id), ((*C.char)(unsafe.Pointer(&outbuf[0]))), C.uint64_t(len))if(result.ec > 0) {fmt.Println("error: ", result.ec, C.GoString(result.err_msg))C.free(unsafe.Pointer(result.err_msg))}else {fmt.Println("result: ", string(outbuf[0:result.len]), "len", result.len)}}C.free_client_pool(pool)C.free(unsafe.Pointer(peer))
}
test_client("0.0.0.0:8806", 1024)
http://www.xdnf.cn/news/6390.html

相关文章:

  • 2025年渗透测试面试题总结-阿里云[实习]阿里云安全-安全工程师(题目+回答)
  • 2025认证杯第二阶段数学建模B题:谣言在社交网络上的传播思路+模型+代码
  • 贝叶斯优化Transformer融合支持向量机多变量回归预测,附相关性气泡图、散点密度图,Matlab实现
  • 【Python 正则表达式】
  • PostgreSQL 联合索引生效条件
  • 揭秘LLM:矩阵运算揭秘LLM单词生成机制
  • C++11多线程thread、原子变量
  • Kafka 中过多的 topic 导致整体上性能变慢的原因
  • Spark--RDD中的转换算子
  • Node.js
  • Miniconda介绍介绍和使用
  • Web3.0:互联网的去中心化未来
  • FPGA: UltraScale+ bitslip实现(ISERDESE3)
  • 记一次bug排查(.exe链接mysql失败)-每天学习一点点
  • (5)python开发经验
  • 组合问题(去重)
  • C++23 新增的查找算法详解:ranges::find_last 系列函数
  • uniapp微信小程序-长按按钮百度语音识别回显文字
  • 印度Rummy游戏支付通道申请策略:技巧类游戏的合规与创新
  • 从零开始学习three.js(18):一文详解three.js中的着色器Shader
  • Spring MVC HttpMessageConverter 的作用是什么?
  • 区块链blog1__合作与信任
  • 电池组PACK自动化生产线:多领域电池生产的“智能引擎”
  • 【美团】后端一面复盘|项目驱动 + 手撕 + JVM + 数据库全面覆盖
  • 重磅发布!OpenAI 推出最新模型 GPT-4.1 系列!
  • 多模态大语言模型arxiv论文略读(七十七)
  • 【氮化镓】HfO2钝化优化GaN 器件性能
  • 【React全栈进阶】从组件设计到性能优化实战指南
  • 什么是TCP协议?它存在哪些安全挑战?
  • K8S Gateway API 快速开始、胎教级教程