分布微服务电商订单系统Rust编码开发[上]
1 整体设计与项目构建
1.1 系统架构设计
A. 微服务划分
订单服务:处理订单创建、查询、确认、状态更新;
支付服务:模拟支付流程,回调订单状态;
库存服务:管理商品库存,支持扣减/回滚;
统一网关:路由请求、托管前端页面(HTML/CSS/JS);
服务注册发现:Nacos 管理服务实例;
数据存储:MongoDB 存储各服务数据。
B. 技术栈
框架:Actix-web(HTTP 服务);
数据库驱动:mongodb(官方库);
服务注册:reqwest(HTTP 客户端调用 Nacos API);
异步运行时:Tokio;
配置管理:dotenv。
1.2 项目架构及其主要编码获取
采用“腾迅ima”,借助AI大模型hunyuan/deepSeek,获取项目设计框架和主要程序初始编码,如图1所示。
图1 “腾迅ima”获取“项目设计框架和主要程序初始编码”截图
1.3 主项目及其各个子项目构建
RustRoverIDE下创建主项目ecommerce-uservices,再在该目录下分别创建四个子项目:order_service、payment_service、inventory_service、gateway。在主项目的Cargo.toml文件中配置整体Workspace如下文本框所示。
[workspace]
resolver = "2"
members = ["order_service", "payment_service", "inventory_service", "gateway"]
1.4 项目文件目录结构
ecommerce-uservices/
├── gateway/ # 统一网关服务
│ ├── src/
│ │ ├── main.rs # 网关入口
│ │ └── routes.rs # 路由配置
│ ├── templates # HTML页面模板
│ │ ├── index.html # 主页面
│ │ ├── orders.html # 订单页面
│ │ ├── payment.html # 支付页面
│ │ └── inventory.html # 库存页面
│ ├─ static/ # 静态页面定义
│ │ ├── style.css # 全局样式
│ │ └── script.js # 全局脚本
│ └── Cargo.toml
├── order_service/ # 订单服务
│ ├── src/
│ │ ├── main.rs # 服务入口
│ │ ├── handler.rs # HTTP 请求处理
│ │ ├── model.rs # 数据结构(订单实体)
│ │ └── db.rs # MongoDB 操作
│ └── Cargo.toml
├── payment_service/ # 支付服务(类似订单服务结构)
│ ├── src/
│ │ ├── main.rs # 服务入口
│ │ ├── handler.rs # HTTP 请求处理
│ │ ├── model.rs # 数据结构(订单实体)
│ │ └── db.rs # MongoDB 操作
│ └── Cargo.toml
├── inventory_service/ # 库存服务(类似订单服务结构)
│ ├── src/
│ │ ├── main.rs # 服务入口
│ │ ├── handler.rs # HTTP 请求处理
│ │ ├── model.rs # 数据结构(订单实体)
│ │ └── db.rs # MongoDB 操作
│ └── Cargo.toml
└── Cargo.toml # Workspace 配置文件
2 订单服务编码设计
2.1 架构原则
- 分层设计:业务逻辑(handler)、数据访问(db)、模型(model)分离。
- 异步非阻塞:基于Tokio运行时,高并发下性能优异。
- 安全序列化:Serde确保JSON与结构体转换的类型安全。
2.2 订单微服务实现功能
- 订单创建:通过POST /orders接口创建新订单
- 订单查询:通过GET /orders/{order_id}接口查询订单详情
- 订单确认:通过PUT /orders/{order_id}/confirm接口确认订单
- 订单刷新:通过PUT /orders/{order_id}/refresh接口更新订单时间戳
2.3 main.rs--服务入口与路由配置
初始化数据库连接池,配置路由(创建订单、查询订单、确认订单、刷新订单)。
mod model; mod db; mod handler; use std::collections::HashMap;
use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
use nacos_sdk::api::props::ClientProps; use handler::order_routes;
use std::net::SocketAddr; use log::info;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 初始化日志
env_logger::init(); info!("Starting order service");
// 服务配置
let service_name = "order_service".to_string(); let group = "ecommerce".to_string();
let ip = "127.0.0.1"; let port = 9001;
// 创建Nacos客户端
let client = NamingServiceBuilder::new(
ClientProps::new().server_addr("127.0.0.1:8848").namespace("public"),
).build()?;
// 构建服务实例(修正字段类型和命名)
let instance = ServiceInstance { service_name: Some(service_name.clone()),
ip: ip.to_string(), port, weight: 1.0, healthy: true, enabled: true,
ephemeral: true, instance_id: None, cluster_name: Some("DEFAULT".to_string()),
metadata: HashMap::from([ ("version".into(), "1.0".into()), ("service".into(), "inventory".into()) ])
};
// 注册服务(修正参数传递)
client.register_instance( service_name.clone(), Some(group.clone()), instance.clone() ).await?;
println!("✅ 库存服务注册成功");
/* 服务注销逻辑(参数匹配)
client.deregister_instance( service_name, Some(group), instance ).await?; */
// 启动Warp服务器
let routes = order_routes();
let addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
println!("Order service running on {}", addr);
warp::serve(routes).run(addr).await; Ok(())
}
2.4 model.rs--数据模型定义
use chrono::Utc; use mongodb::bson::oid::ObjectId; use mongodb::error::Error;
use serde::{Deserialize, Serialize}; use warp::reject::Reject;
#[derive(Debug, Serialize, Deserialize)]
pub struct Order {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>, pub order_id: String, pub user_id: String,
pub product_id: String, pub quantity: i32, pub total_price: f64,
pub status: String, // "created", "confirmed", "completed", "canceled"
pub created_at: String, pub updated_at: String, }
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateOrderRequest { pub user_id: String,
pub product_id: String, pub quantity: i32, pub total_price: f64, }
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateOrderStatusRequest { pub status: String, }
impl Order {
pub fn new(req: CreateOrderRequest) -> Self {
let now = Utc::now().to_rfc3339();
Order { id: None, order_id: uuid::Uuid::new_v4().to_string(),
user_id: req.user_id, product_id: req.product_id,
quantity: req.quantity, total_price: req.total_price,
status: "created".to_string(), created_at: now.clone(), updated_at: now,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OrderError { message: String, }
impl Reject for OrderError {}
impl OrderError {
pub fn new(message: &Error) -> Self {
OrderError { message: message.to_string(), }
}
}
use warp::{Filter, Rejection, Reply}; use serde_json::json;
2.5 handler.rs--请求处理逻辑
处理HTTP请求,调用数据库操作并返回响应。
use crate::model::{Order, CreateOrderRequest, UpdateOrderStatusRequest, OrderError};
use crate::db::{create_order, get_order, update_order_status, refresh_order};
pub async fn health_check() -> Result<impl Reply, Rejection> {
Ok(warp::reply::json(&json!({"status": "ok"})))
}
pub async fn handle_create_order(req: CreateOrderRequest) -> Result<impl Reply, Rejection> {
let order = Order::new(req);
match create_order(order).await {
Ok(id) => Ok(warp::reply::json(&json!({"order_id": id}))),
Err(e) => Err(warp::reject::custom(OrderError::new(&e))),
}
}
pub async fn handle_get_order(order_id: String) -> Result<impl Reply, Rejection> {
match get_order(&order_id).await {
Ok(Some(order)) => Ok(warp::reply::json(&order)),
Ok(None) => Err(warp::reject::not_found()),
Err(e) => Err(warp::reject::custom(OrderError::new(&e))),
}
}
pub async fn handle_confirm_order(order_id: String, req: UpdateOrderStatusRequest) -> Result<impl Reply, Rejection> {
match update_order_status(&order_id, &req.status).await {
Ok(_) => Ok(warp::reply::json(&json!({"status": "confirmed"}))),
Err(e) => Err(warp::reject::custom(OrderError::new(&e))),
}
}
pub async fn handle_refresh_order(order_id: String) -> Result<impl Reply, Rejection> {
match refresh_order(&order_id).await {
Ok(_) => Ok(warp::reply::json(&json!({"status": "refreshed"}))),
Err(e) => Err(warp::reject::custom(OrderError::new(&e))),
}
}
pub fn order_routes() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
let health = warp::path!("health").and(warp::get()).and_then(health_check);
let create_order = warp::path!("orders").and(warp::post())
.and(warp::body::json()).and_then(handle_create_order);
let get_order = warp::path!("orders" / String).and(warp::get()).and_then(handle_get_order);
let confirm_order = warp::path!("orders" / String / "confirm")
.and(warp::put()).and(warp::body::json()).and_then(handle_confirm_order);
let refresh_order = warp::path!("orders" / String / "refresh")
.and(warp::put()).and_then(handle_refresh_order);
health.or(create_order).or(get_order).or(confirm_order).or(refresh_order)
}
2.6 db.rs--数据库操作
use mongodb::{Client, Collection}; use mongodb::options::ClientOptions;
use crate::model::Order; use mongodb::bson::doc; use tokio::sync::OnceCell as AsyncOnceCell;
// 使用 OnceCell 来延迟初始化全局集合
static ORDER_COLLECTION: AsyncOnceCell<Collection<Order>> = AsyncOnceCell::const_new();
// 异步初始化 MongoDB 集合
async fn init_collection() -> Collection<Order> {
let client_options = ClientOptions::parse("mongodb://localhost:27017")
.await.expect("Failed to parse MongoDB connection string");
let client = Client::with_options(client_options)
.expect("Failed to create MongoDB client");
client.database("ecommerce1").collection("orders")
}
// 获取集合实例
async fn get_collection() -> &'static Collection<Order> {
ORDER_COLLECTION.get_or_init(init_collection).await
}
pub async fn create_order(order: Order) -> Result<String, mongodb::error::Error> {
let collection = get_collection().await;
let result = collection.insert_one(order, None).await?;
Ok(result.inserted_id.to_string())
}
pub async fn get_order(order_id: &str) -> Result<Option<Order>, mongodb::error::Error> {
let collection = get_collection().await;
let filter = doc! { "order_id": order_id };
collection.find_one(filter, None).await
}
pub async fn update_order_status(order_id: &str, status: &str) -> Result<(), mongodb::error::Error> {
let collection = get_collection().await;
let filter = doc! { "order_id": order_id };
let update = doc! { "$set": { "status": status, "updated_at": chrono::Utc::now().to_rfc3339() } };
collection.update_one(filter, update, None).await?; Ok(())
}
pub async fn refresh_order(order_id: &str) -> Result<(), mongodb::error::Error> {
let collection = get_collection().await;
let filter = doc! { "order_id": order_id };
let update = doc! { "$set": { "updated_at": chrono::Utc::now().to_rfc3339() } };
collection.update_one(filter, update, None).await?; Ok(())
}
2.7 Cargo.toml--依赖配置
[package]
name = "order_service"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
warp = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
mongodb = { version = "2.0", feature = ["sync"] }
lazy_static = "1.4"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "0.8", features = ["v4"] }
env_logger = "0.11"
log = "0.4"
nacos-sdk = "0.4.0"
3 支付服务编码设计
3.1 功能设计
A. 支付流程
用户选择订单并指定支付方式(微信/支付宝);
系统模拟支付处理(80%成功率);
更新订单状态和支付状态。
B. 订单查询
支持按订单ID查询单个订单;
支持按用户ID、支付状态等多条件查询订单列表;
C. NACOS服务注册
服务启动时自动注册到NACOS;
定期发送心跳维持服务健康状态。
D. MongoDB集成
使用官方MongoDB Rust驱动;
支持订单的CRUD操作;
支持复杂查询和更新。
E. 错误处理
自定义错误类型;
统一的错误处理机制。
3.2 main.rs--服务入口与路由配置
mod handler; mod db; mod model;
use warp::{Filter}; use std::sync::Arc; use tokio::sync::Mutex;
use crate::handler::payment_routes; use crate::db::DbClient;
use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
use nacos_sdk::api::props::ClientProps; use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 服务配置
let service_name = "payment_service".to_string();
let group = "ecommerce".to_string(); let ip = "127.0.0.1"; let port = 9002;
// 创建Nacos客户端
let client = NamingServiceBuilder::new(
ClientProps::new().server_addr("127.0.0.1:8848").namespace("public"),).build()?;
// 构建服务实例(修正字段类型和命名)
let instance = ServiceInstance { service_name: Some(service_name.clone()),
ip: ip.to_string(), port, weight: 1.0, healthy: true, enabled: true,
ephemeral: true, instance_id: None, cluster_name: Some("DEFAULT".to_string()),
metadata: HashMap::from([ ("version".into(), "1.0".into()), ("service".into(), "inventory".into()) ]) };
// 注册服务(修正参数传递)
client.register_instance( service_name.clone(), Some(group.clone()), instance.clone() ).await?;
println!("✅ 库存服务注册成功");
/* 服务注销逻辑(参数匹配)
client.deregister_instance( service_name, Some(group), instance ).await?; */
// 初始化MongoDB客户端
let db_client = match init_db_client().await { Ok(client) => client,
Err(e) => { eprintln!("Failed to initialize database client: {}", e); std::process::exit(1); } };
// 创建共享的数据库客户端
let db = Arc::new(Mutex::new(db_client));
// 设置路由
let routes = payment_routes(db).with(warp::cors().allow_any_origin()).with(warp::log("payment_service"));
println!("Payment service started on port 9002");
warp::serve(routes).run(([127, 0, 0, 1], 9002)).await; Ok(())
}
async fn init_db_client() -> Result<DbClient, Box<dyn std::error::Error>> {
// MongoDB连接配置
let uri = "mongodb://localhost:27017";
let db_name = "ecommerce1";//"payment_db";
let collection_name = "orders";
// 创建数据库客户端
let client = DbClient::new(uri, db_name, collection_name).await?; Ok(client)
}
3.3 model.rs--数据模型定义
use serde::{Deserialize, Serialize}; use mongodb::bson::oid::ObjectId;
#[derive(Debug, Serialize, Deserialize)]
pub enum Currency { CNY, USD, EUR, }
#[derive(Debug, Serialize, Deserialize)]
pub enum PayStatus { Pending, Processing, Paid, Failed, Refunded, } // 待支付,处理中,已支付,支付失败,已退款
#[derive(Debug, Serialize, Deserialize)]
pub enum PaymentMethod { WeChatPay, Alipay, BankTransfer, CreditCard, }
#[derive(Debug, Serialize, Deserialize)]
pub struct PaymentRequest { pub order_id: String, pub payment_method: PaymentMethod, pub currency: Currency, }
#[derive(Debug, Serialize, Deserialize)]
pub struct PaymentResponse { pub order_id: String, pub status: PayStatus, pub payment_method: PaymentMethod,
pub amount: f64, pub transaction_id: Option<String>, pub payment_time: Option<String>, }
#[derive(Debug, Serialize, Deserialize)]
pub struct QueryOrderRequest { pub order_id: Option<String>, pub user_id: Option<String>,
pub pay_status: Option<PayStatus>, }
#[derive(Debug, Serialize, Deserialize)]
pub struct Order {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub order_id: String,
pub user_id: String,
pub product_id: String,
pub quantity: i32,
pub total_price: f64,
pub status: String, // "created", "confirmed", "completed", "canceled"
pub pay_status: PayStatus,
pub payment_method: Option<PaymentMethod>,
pub currency: Option<Currency>,
pub transaction_id: Option<String>,
pub created_at: String,
pub updated_at: String,
}
impl From<Order> for PaymentResponse {
fn from(order: Order) -> Self {
PaymentResponse { order_id: order.order_id, status: order.pay_status,
payment_method: order.payment_method.unwrap_or(PaymentMethod::WeChatPay),
amount: order.total_price, transaction_id: order.transaction_id,
payment_time: Some(order.updated_at),
}
}
}
3.4 handler.rs--请求处理逻辑
处理HTTP请求,调用数据库操作并返回响应。
use warp::{Filter, Rejection, Reply}; use serde_json::json; use std::sync::Arc;
use tokio::sync::Mutex; use thiserror::Error; use rand::Rng; use crate::db::{DbClient, DbError};
use crate::model::{PaymentRequest, PaymentResponse, QueryOrderRequest, PayStatus};
#[derive(Error, Debug)]
pub enum HandlerError {
#[error("Database error: {0}")]
DbError(#[from] DbError),
#[error("Payment processing error")]
PaymentError,
#[error("Order not found")]
OrderNotFound,
}
impl warp::reject::Reject for HandlerError {}
pub async fn handle_payment( order_id: String, req: PaymentRequest,
db: Arc<Mutex<DbClient>>, ) -> Result<impl Reply, Rejection> {
let db = db.lock().await;
let order = db.get_order(&order_id).await
.map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;
if order.is_none() { return Err(warp::reject::custom(HandlerError::OrderNotFound)); }
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
db.process_payment(&order_id, &req.payment_method).await
.map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;
let payment_successful = rand::thread_rng().gen_bool(0.8);
if payment_successful {
let order = db.complete_payment(&order_id).await
.map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;
Ok(warp::reply::json(&json!({ "status": "success",
"message": "Payment processed successfully", "order": PaymentResponse::from(order) })))
} else {
db.update_payment_status( &order_id, PayStatus::Failed, Some(&req.payment_method),
).await.map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;
Err(warp::reject::custom(HandlerError::PaymentError))
}
}
pub async fn handle_get_order( order_id: String, db: Arc<Mutex<DbClient>>, ) -> Result<impl Reply, Rejection> {
let db = db.lock().await;
let order = db.get_order(&order_id).await
.map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;
match order { Some(order) => Ok(warp::reply::json(&PaymentResponse::from(order))),
None => Err(warp::reject::custom(HandlerError::OrderNotFound)), }
}
pub async fn handle_query_orders( query: QueryOrderRequest,
db: Arc<Mutex<DbClient>>, ) -> Result<impl Reply, Rejection> {
let db = db.lock().await;
let orders = db.query_orders(query).await
.map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;
let responses = orders.into_iter().map(PaymentResponse::from).collect::<Vec<_>>();
Ok(warp::reply::json(&responses))
}
pub fn payment_routes( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
let db_filter = warp::any().map(move || Arc::clone(&db));
let pay_order = warp::path!("orders" / String / "pay")
.and(warp::post()).and(warp::body::json()).and(db_filter.clone())
.and_then(|order_id, req, db| async move { handle_payment(order_id, req, db).await });
let get_order = warp::path!("orders" / String).and(warp::get()).and(db_filter.clone())
.and_then(|order_id, db| async move { handle_get_order(order_id, db).await });
let query_orders = warp::path!("orders").and(warp::get()).and(warp::query::<QueryOrderRequest>())
.and(db_filter.clone()).and_then(|query, db| async move { handle_query_orders(query, db).await });
let health = warp::path!("health").and(warp::get())
.map(|| warp::reply::json(&json!({"status": "ok"})));
pay_order.or(get_order).or(query_orders).or(health)
}
3.5 db.rs--数据库操作
use mongodb::{Client, Collection}; use mongodb::bson::{doc, to_bson};
use mongodb::options::{ClientOptions, FindOptions};
use crate::model::{Order, PayStatus, QueryOrderRequest, PaymentMethod};
use thiserror::Error; use chrono::Utc; use rand::Rng;
#[derive(Error, Debug)]
pub enum DbError {
#[error("MongoDB error: {0}")]
MongoError(#[from] mongodb::error::Error),
#[error("BSON serialization error: {0}")]
BsonError(String),
#[error("Order not found")]
OrderNotFound,
}
impl From<bson::ser::Error> for DbError {
fn from(error: bson::ser::Error) -> Self { DbError::BsonError(error.to_string()) }
}
pub struct DbClient { collection: Collection<Order>, }
impl DbClient {
pub async fn new(uri: &str, db_name: &str, collection_name: &str) -> Result<Self, DbError> {
let client_options = ClientOptions::parse(uri).await?;
let client = Client::with_options(client_options)?;
let db = client.database(db_name); let collection = db.collection::<Order>(collection_name);
Ok(Self { collection })
}
pub async fn get_order(&self, order_id: &str) -> Result<Option<Order>, DbError> {
let filter = doc! { "order_id": order_id };
let order = self.collection.find_one(filter, None).await?; Ok(order)
}
pub async fn update_payment_status( &self, order_id: &str, pay_status: PayStatus,
payment_method: Option<&PaymentMethod>, ) -> Result<(), DbError> {
let filter = doc! { "order_id": order_id };
let update = doc! {
"$set": { "pay_status": to_bson(&pay_status)?,
"payment_method": payment_method.map(to_bson).transpose()?,
"updated_at": Utc::now().to_rfc3339()
}
};
let result = self.collection.update_one(filter, update, None).await?;
if result.matched_count == 0 { return Err(DbError::OrderNotFound); }
Ok(())
}
pub async fn process_payment( &self, order_id: &str,
payment_method: &PaymentMethod, ) -> Result<Order, DbError> {
let filter = doc! { "order_id": order_id };
let update = doc! {
"$set": { "pay_status": to_bson(&PayStatus::Processing)?,
"payment_method": to_bson(payment_method)?, "updated_at": Utc::now().to_rfc3339()
}
};
let options = mongodb::options::FindOneAndUpdateOptions::builder()
.return_document(mongodb::options::ReturnDocument::After).build();
self.collection.find_one_and_update(filter, update, options).await?
.ok_or(DbError::OrderNotFound)
}
pub async fn complete_payment(&self, order_id: &str) -> Result<Order, DbError> {
let transaction_id = format!("TRX{:08}", rand::thread_rng().gen_range(0..99999999));
let filter = doc! { "order_id": order_id };
let update = doc! {
"$set": { "pay_status": to_bson(&PayStatus::Paid)?,
"transaction_id": transaction_id, "updated_at": Utc::now().to_rfc3339()
}
};
let options = mongodb::options::FindOneAndUpdateOptions::builder()
.return_document(mongodb::options::ReturnDocument::After).build();
self.collection.find_one_and_update(filter, update, options).await?
.ok_or(DbError::OrderNotFound)
}
pub async fn query_orders(&self, query: QueryOrderRequest) -> Result<Vec<Order>, DbError> {
let mut filter = doc! {};
if let Some(order_id) = query.order_id { filter.insert("order_id", order_id); }
if let Some(user_id) = query.user_id { filter.insert("user_id", user_id); }
if let Some(pay_status) = query.pay_status { filter.insert("pay_status", to_bson(&pay_status)?); }
let find_options = FindOptions::builder().sort(doc! { "created_at": -1 }).build();
let mut cursor = self.collection.find(filter, find_options).await?;
let mut orders = Vec::new();
while cursor.advance().await? {
orders.push(cursor.deserialize_current()?);
}
Ok(orders)
}
}
3.6 Cargo.toml--依赖配置
[package]
name = "payment_service"
version = "0.1.0"
edition = "2024"
[dependencies]
warp = "0.3"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
mongodb = { version = "2.0", feature = ["sync"] }
bson = "2.0"
chrono = { version = "0.4", features = ["serde"] }
rand = "0.8"
thiserror = "1.0"
nacos-sdk = "0.4.0"
4 库存服务编码设计
4.1 功能设计
A. 库存模型设计:
使用 stock 表示总库存量;
reserved 表示已预留库存;
available 表示可用库存(stock - reserved);
version 字段用于乐观锁控制并发更新。
B. 与订单系统的兼容性:
使用相同的 product_id 字段与订单系统关联;
预留库存时可关联 order_id 便于追踪。
C. 错误处理:
自定义 DbError 并实现 Reject trait 用于统一错误处理;
区分库存不存在和库存不足的不同错误类型。
D. 并发控制:
使用 MongoDB 的原子操作确保库存更新的正确性;
乐观锁机制防止超卖。
E. API 设计:
RESTful 风格 API;
统一前缀 /api 便于网关路由;
支持 CORS 便于前端调用。
4.2 main.rs--服务入口与路由配置
use warp::{Filter, Rejection, Reply}; use std::sync::Arc; use tokio::sync::Mutex;
use crate::{db::DbClient, handler::inventory_routes}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use log::info; use std::collections::HashMap; use nacos_sdk::api::props::ClientProps;
use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
mod db; mod handler; mod model;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init(); info!("Starting inventory service");
// 服务配置
let service_name = "inventory_service".to_string();
let group = "ecommerce".to_string(); let ip = "127.0.0.1";let port = 9003;
// 创建Nacos客户端
let client = NamingServiceBuilder::new(
ClientProps::new().server_addr("127.0.0.1:8848").namespace("public"), ).build()?;
// 构建服务实例(修正字段类型和命名)
let instance = ServiceInstance {service_name: Some(service_name.clone()),
ip: ip.to_string(), port, weight: 1.0, healthy: true, enabled: true,
ephemeral: true, instance_id: None, cluster_name: Some("DEFAULT".to_string()),
metadata: HashMap::from([ ("version".into(), "1.0".into()), ("service".into(), "inventory".into()) ]) };
// 注册服务(修正参数传递)
client.register_instance( service_name.clone(), Some(group.clone()), instance.clone() ).await?;
println!("✅ 库存服务注册成功");
/* 服务注销逻辑(参数匹配)
client.deregister_instance( service_name, Some(group), instance ).await?; */
// 初始化 MongoDB 连接
let db_client = Arc::new(Mutex::new(
DbClient::new("mongodb://localhost:27017", "ecommerce1").await
.map_err(|e| { log::error!("Failed to initialize database client: {}", e); e })? ));
// 定义 API 路由
let api = warp::path("api");
let routes = api.and( inventory_routes(db_client.clone()).with(warp::log("inventory_service")) )
.with(warp::cors().allow_any_origin().allow_methods(vec!["GET", "POST", "PUT", "DELETE"])
.allow_headers(vec!["Content-Type"])).recover(handle_rejection);
// 启动服务
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9003);
info!("Starting inventory service on http://{}", socket_addr);
warp::serve(routes).run(socket_addr).await; Ok(())
}
async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
let (code, message) = if err.is_not_found() {
(warp::http::StatusCode::NOT_FOUND, "Not Found".to_string())
} else if let Some(e) = err.find::<model::DbError>() {
match e {
model::DbError::InventoryNotFound =>
(warp::http::StatusCode::NOT_FOUND, "Inventory not found".to_string()),
model::DbError::InsufficientStock =>
(warp::http::StatusCode::BAD_REQUEST, "Insufficient stock".to_string()),
_ =>
(warp::http::StatusCode::INTERNAL_SERVER_ERROR, "Database error".to_string()),
}
} else {
(warp::http::StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())
};
Ok(warp::reply::with_status(message, code))
}
4.3 model.rs--数据模型定义
use bson::oid::ObjectId; use serde::{Deserialize, Serialize}; use mongodb::bson::DateTime;
#[derive(Debug, Serialize, Deserialize)]
pub struct Inventory {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub product_id: String,
pub stock: i32, // 总库存量
pub reserved: i32, // 已预留库存
pub available: i32, // 可用库存
pub last_updated: DateTime,
pub version: i32, // 用于乐观锁控制
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InventoryUpdateRequest {
pub product_id: String,
pub delta: i32, // 正数表示增加库存,负数表示减少
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InventoryQueryRequest {
pub product_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InventoryReserveRequest {
pub product_id: String, pub quantity: i32,
pub order_id: Option<String>, // 可选,关联订单ID
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InventoryResponse { pub product_id: String, pub stock: i32, pub reserved: i32,
pub available: i32, pub last_updated: String, }
#[derive(Debug, thiserror::Error)]
pub enum DbError {
#[error("Inventory not found")]
InventoryNotFound,
#[error("Insufficient stock")]
InsufficientStock,
#[error("Database error: {0}")]
MongoError(#[from] mongodb::error::Error),
#[error("Serialization error: {0}")]
BsonError(#[from] bson::ser::Error),
}
// 实现 warp 的 Reject trait 以便错误处理
impl warp::reject::Reject for DbError {}
4.4 handler.rs--请求处理逻辑
处理HTTP请求,调用数据库操作并返回响应。
use warp::{Filter, Rejection, Reply}; use std::sync::Arc; use tokio::sync::Mutex; use crate::db::DbClient;
use crate::model::{Inventory, InventoryUpdateRequest, InventoryQueryRequest, InventoryReserveRequest, InventoryResponse};
pub fn inventory_routes( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
create_inventory(db.clone()).or(get_inventory(db.clone()))
.or(update_inventory(db.clone())).or(reserve_inventory(db.clone()))
}
fn create_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path!("inventory").and(warp::post()).and(with_db(db))
.and(warp::body::json()).and_then(create_inventory_handler)
}
fn get_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path!("inventory").and(warp::get()).and(with_db(db))
.and(warp::query::<InventoryQueryRequest>()).and_then(get_inventory_handler)
}
fn update_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path!("inventory" / "update").and(warp::post())
.and(with_db(db)).and(warp::body::json()).and_then(update_inventory_handler)
}
fn reserve_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path!("inventory" / "reserve").and(warp::post()).and(with_db(db))
.and(warp::body::json()).and_then(reserve_inventory_handler)
}
fn with_db(db: Arc<Mutex<DbClient>>) -> impl Filter<Extract = (Arc<Mutex<DbClient>>,),
Error = std::convert::Infallible> + Clone {
warp::any().map(move || db.clone())
}
async fn create_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryQueryRequest,
) -> Result<impl Reply, Rejection> {
let db = db.lock().await;
let inventory = db.create_inventory(&req.product_id, 0).await.map_err(warp::reject::custom)?;
Ok(warp::reply::json(&to_response(inventory)))
}
async fn get_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryQueryRequest,
) -> Result<impl Reply, Rejection> {
let db = db.lock().await;
let inventory = db.get_inventory(&req.product_id).await.map_err(warp::reject::custom)?;
Ok(warp::reply::json(&to_response(inventory)))
}
async fn update_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryUpdateRequest,
) -> Result<impl Reply, Rejection> {
let db = db.lock().await;
let inventory = db.update_inventory(&req).await.map_err(warp::reject::custom)?;
Ok(warp::reply::json(&to_response(inventory)))
}
async fn reserve_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryReserveRequest,
) -> Result<impl Reply, Rejection> {
let db = db.lock().await;
let inventory = db.reserve_inventory(&req).await.map_err(warp::reject::custom)?;
Ok(warp::reply::json(&to_response(inventory)))
}
fn to_response(inventory: Inventory) -> InventoryResponse {
InventoryResponse { product_id: inventory.product_id, stock: inventory.stock,
reserved: inventory.reserved, available: inventory.available,
last_updated: inventory.last_updated.to_string(),
}
}
4.5 db.rs--数据库操作
use mongodb::{Client, Collection, options::ClientOptions}; use mongodb::bson::doc;
use crate::model::{Inventory, InventoryUpdateRequest, InventoryReserveRequest, DbError};
pub struct DbClient { inventory_collection: Collection<Inventory> }
impl DbClient {
pub async fn new(uri: &str, db_name: &str) -> Result<Self, DbError> {
let client_options = ClientOptions::parse(uri).await?;
let client = Client::with_options(client_options)?;
let db = client.database(db_name);
Ok(Self { inventory_collection: db.collection("inventory"),
//orders_collection: db.collection("orders"),
})
}
pub async fn create_inventory(&self, product_id: &str, initial_stock: i32) -> Result<Inventory, DbError> {
let now = bson::DateTime::now();
let inventory = Inventory { id: None, product_id: product_id.to_string(), stock: initial_stock,
reserved: 0, available: initial_stock, last_updated: now, version: 1, };
let result = self.inventory_collection.insert_one(inventory, None).await?;
let inserted_id = result.inserted_id.as_object_id().unwrap();
self.get_inventory_by_id(&inserted_id).await
}
pub async fn get_inventory(&self, product_id: &str) -> Result<Inventory, DbError> {
self.inventory_collection.find_one(doc! { "product_id": product_id }, None).await?
.ok_or(DbError::InventoryNotFound)
}
pub async fn update_inventory(&self, req: &InventoryUpdateRequest) -> Result<Inventory, DbError> {
let filter = doc! { "product_id": &req.product_id };
let update = doc! { "$inc": { "stock": req.delta, "available": req.delta, "version": 1 },
"$currentDate": { "last_updated": true } };
let options = mongodb::options::FindOneAndUpdateOptions::builder()
.return_document(mongodb::options::ReturnDocument::After).build();
self.inventory_collection.find_one_and_update(filter, update, options).await?
.ok_or(DbError::InventoryNotFound)
}
pub async fn reserve_inventory(&self, req: &InventoryReserveRequest) -> Result<Inventory, DbError> {
let filter = doc! { "product_id": &req.product_id, "available": { "$gte": req.quantity } };
let update = doc! { "$inc": { "reserved": req.quantity, "available": -req.quantity, "version": 1 },
"$currentDate": { "last_updated": true }
};
let options = mongodb::options::FindOneAndUpdateOptions::builder()
.return_document(mongodb::options::ReturnDocument::After).build();
self.inventory_collection.find_one_and_update(filter, update, options).await?
.ok_or(DbError::InsufficientStock)
}
async fn get_inventory_by_id(&self, id: &bson::oid::ObjectId) -> Result<Inventory, DbError> {
self.inventory_collection.find_one(doc! { "_id": id }, None).await?
.ok_or(DbError::InventoryNotFound)
}
}
4.6 Cargo.toml--依赖配置
[package]
name = "inventory_service"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
warp = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
mongodb = { version = "2.4", feature = ["sync"] }
bson = "2.4"
futures = "0.3"
log = "0.4"
env_logger = "0.9"
thiserror = "1.0"
nacos-sdk = "0.4.0"