当前位置: 首页 > ds >正文

鸿蒙分布式计算实战:用 ArkTS+Worker 池落地可运行任务管理 Demo,从单设备到跨设备全方案

在这里插入图片描述

摘要(介绍目前的背景和现状)

端侧算力越来越强,设备也越来越多:手机、平板、手表、车机、智慧屏……把这些设备“串成一台分布式计算机”,在边缘完成更低时延、更高隐私的数据处理,是很多应用的刚需(比如大图处理、语音转写、日志分析、模型微调)。但一旦上分布式,任务调度、负载均衡、失败重试、状态监控、结果合并就变成工程核心。本文提供一套“能跑”的参考实现,并给出扩展到跨设备(SoftBus / DeviceManager / RPC)的落地思路。

引言(介绍目前的发展情况和场景应用)

鸿蒙具备分布式能力:跨设备发现与连接(DeviceManager/SoftBus)、跨端数据(分布式数据对象/分布式文件)、跨端调用(RPC/跨设备拉起)。这些原语让我们可以在应用层实现一套自定义的分布式调度器。本文先在单设备里用 Worker 池模拟“多个计算节点”,把核心调度/容错/监控机制做扎实;随后给出如何替换为跨设备通信与执行的对接点。

总体设计

功能目标与模块划分

  • 任务调度器:基于优先级 + 资源需求 + 节点状态,动态分配任务
  • 任务分解与合并:Map-Reduce 风格,把大任务拆成子任务并行,完成后合并
  • 负载均衡:加权轮询 + 实时负载反馈,避免热节点过载
  • 状态监控:任务生命周期(待分配/执行中/已完成/失败/迁移),节点心跳
  • 故障恢复:节点超时/失败自动回收任务并迁移
  • 结果收集与整合:流式归并,边算边合并,降低尾延迟

关键数据结构

  • Task:描述一个可拆分的大任务,包含优先级、拆分器、合并器等
  • SubTask:子任务元数据(输入分片、资源需求、超时、重试次数)
  • Node:一个计算节点(Demo 用 Worker 模拟;真实环境可映射到设备或远端 Ability)
  • Scheduler:维护任务优先队列、分配策略、监控与容错

核心代码示例(可运行 Demo)

说明:下面是一个最小可运行的 ArkTS Demo 结构。它在单设备内用 Worker 池模拟多个“分布式节点”,支持任务拆分、调度、监控、故障迁移和结果合并。你可以在 DevEco Studio 新建 Stage 工程,把这些文件放进对应目录即可运行。
真跨设备时,只需把 WorkerNode 的通信层替换为 SoftBus/RPC,即可对接真实节点。

类型与工具(/common/types.ts

// /common/types.ts
export type Priority = 'HIGH' | 'MEDIUM' | 'LOW';export interface ResourceHint {cpuCost?: number;   // 预估CPU开销(相对值)memCost?: number;   // 预估内存开销(相对值)deadlineMs?: number;timeoutMs?: number; // 子任务超时
}export interface SubTask<Input, Output> {id: string;taskId: string;shardIndex: number;input: Input;priority: Priority;resource: ResourceHint;retries: number;maxRetries: number;createdAt: number;
}export type SubTaskStatus = 'PENDING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'CANCELLED';export interface SubTaskResult<Output> {subTaskId: string;status: SubTaskStatus;output?: Output;error?: string;durationMs?: number;nodeId?: string;
}export interface TaskSpec<Input, MapOut, ReduceOut> {id: string;name: string;priority: Priority;input: Input;// 拆分器:把大任务拆成多个子任务输入splitter: (input: Input) => Input[];// map计算函数签名(放到节点执行)// 在真实分布式时,这段逻辑位于节点进程/设备侧mapperFuncName: string;// 合并器:把多个子结果合成最终结果reducer: (partials: MapOut[]) => ReduceOut;resource?: ResourceHint;
}export interface NodeMetrics {nodeId: string;running: number;capacity: number;avgLatencyMs: number;lastHeartbeat: number;healthy: boolean;
}export interface NodeLike {id: string;capacity: number;        // 可并发处理的子任务数量post(subTask: any): Promise<SubTaskResult<any>>;healthy(): boolean;metrics(): NodeMetrics;stop(): void;
}

简单事件总线(/common/bus.ts

// /common/bus.ts
type Handler = (...args: any[]) => void;export class EventBus {private map = new Map<string, Set<Handler>>();on(evt: string, fn: Handler) {if (!this.map.has(evt)) this.map.set(evt, new Set());this.map.get(evt)!.add(fn);}off(evt: string, fn: Handler) {this.map.get(evt)?.delete(fn);}emit(evt: string, ...args: any[]) {this.map.get(evt)?.forEach(fn => fn(...args));}
}
export const bus = new EventBus();

Worker 节点实现(/workers/node.ts

// /workers/node.ts
// 这是 Worker 线程脚本,负责实际执行“子任务的 mapper 函数”
// 注意:Worker 文件需在 module.json5 里声明(见文末说明)import worker from '@ohos.worker';const globalMapperRegistry: Record<string, (input: any) => any> = {// 在这里注册可执行的 mapper 函数(Demo 里放几个例子)'heavyPrimeCount': (n: number) => {// 计算 <= n 的素数个数,模拟 CPU 密集任务const isPrime = (x: number) => {if (x < 2) return false;for (let i=2;i*i<=x;i++) if (x % i === 0) return false;return true;};let cnt = 0;for (let i=2;i<=n;i++) if (isPrime(i)) cnt++;return cnt;},'wordCount': (text: string) => {const map = new Map<string, number>();text.split(/\s+/).forEach(w => {if (!w) return;const key = w.toLowerCase();map.set(key, (map.get(key) || 0) + 1);});// 返回普通对象,避免 Map 的结构化拷贝问题const obj: Record<string, number> = {};map.forEach((v,k) => obj[k] = v);return obj;},'matrixRowMul': (payload: { row: number[], col: number[] }) => {const { row, col } = payload;let sum = 0;for (let i = 0; i < row.length; i++) sum += row[i] * col[i];return sum;}
};const parent = worker.parentPort;parent?.onmessage = async (evt) => {const { subTaskId, mapperFuncName, input, startedAt } = evt.data;const start = Date.now();try {const fn = globalMapperRegistry[mapperFuncName];if (!fn) throw new Error(`Mapper not found: ${mapperFuncName}`);const output = fn(input);parent?.postMessage({subTaskId,status: 'SUCCEEDED',output,durationMs: Date.now() - start,});} catch (err: any) {parent?.postMessage({subTaskId,status: 'FAILED',error: String(err?.message || err),durationMs: Date.now() - start,});}
};

Worker 节点包装(/common/nodePool.ts

// /common/nodePool.ts
import worker from '@ohos.worker';
import { NodeLike, NodeMetrics, SubTask, SubTaskResult } from './types';let idSeq = 0;export class WorkerNode implements NodeLike {id: string;capacity: number;private running = 0;private histLatency: number[] = [];private lastOkTs = Date.now();private workerInst: worker.ThreadWorker;constructor(capacity: number, workerScript: string) {this.id = `node-${++idSeq}`;this.capacity = capacity;this.workerInst = new worker.ThreadWorker(workerScript);this.workerInst.on('error', (e) => {// Worker 异常当作故障console.error(`[${this.id}] worker error:`, e?.message);});}healthy(): boolean {// 简化:最近10秒有成功返回就算健康return (Date.now() - this.lastOkTs) < 10_000;}async post(subTask: SubTask<any, any>): Promise<SubTaskResult<any>> {this.running++;const startedAt = Date.now();const res: SubTaskResult<any> = await new Promise((resolve) => {const onMessage = (msg: any) => {const result: SubTaskResult<any> = {subTaskId: msg.subTaskId,status: msg.status,output: msg.output,error: msg.error,durationMs: msg.durationMs,nodeId: this.id};this.workerInst.off('message', onMessage as any);resolve(result);};const onError = (err: any) => {this.workerInst.off('error', onError as any);resolve({subTaskId: subTask.id,status: 'FAILED',error: String(err?.message || err),durationMs: Date.now() - startedAt,nodeId: this.id});};this.workerInst.on('message', onMessage as any);this.workerInst.on('error', onError as any);this.workerInst.postMessage({subTaskId: subTask.id,mapperFuncName: subTask.resource?.deadlineMs ? 'heavyPrimeCount' : subTask.taskId, // 仅示例// 实际请传 subTask 对应的 mapper 名称,这里由 Scheduler 负责});});this.running--;if (res.status === 'SUCCEEDED') {this.lastOkTs = Date.now();this.histLatency.push(res.durationMs || 0);if (this.histLatency.length > 20) this.histLatency.shift();}return res;}metrics(): NodeMetrics {const avg =this.histLatency.length === 0? 0: Math.round(this.histLatency.reduce((a, b) => a + b, 0) / this.histLatency.length);return {nodeId: this.id,running: this.running,capacity: this.capacity,avgLatencyMs: avg,lastHeartbeat: this.lastOkTs,healthy: this.healthy()};}stop() {try { this.workerInst.terminate(); } catch {}}
}export class NodePool {private nodes: WorkerNode[] = [];register(node: WorkerNode) { this.nodes.push(node); }all(): WorkerNode[] { return this.nodes; }healthy(): WorkerNode[] { return this.nodes.filter(n => n.healthy()); }stopAll() { this.nodes.forEach(n => n.stop()); }
}

提示:真实跨设备时,你可以写一个 RemoteNode implements NodeLike,在 post() 里通过 SoftBus/RPC 把子任务发到远端执行,再把结果回传。metrics() 可来自远端心跳与统计。

调度器(/common/scheduler.ts

// /common/scheduler.ts
import { bus } from './bus';
import {NodeLike, NodeMetrics, SubTask, SubTaskResult, TaskSpec, Priority
} from './types';
import { NodePool } from './nodePool';function now() { return Date.now(); }
function uid() { return Math.random().toString(36).slice(2); }interface InternalTaskState<MapOut> {spec: TaskSpec<any, MapOut, any>;subTasks: SubTask<any, MapOut>[];pending: Set<string>;running: Map<string, { nodeId: string; startedAt: number }>;results: Map<string, MapOut>;failed: Map<string, string>;cancelled: boolean;
}export class Scheduler {private pool: NodePool;private queue: InternalTaskState<any>[] = [];private timer?: number;private nodeSelectorIdx = 0;constructor(pool: NodePool) {this.pool = pool;}submit<Input, MapOut, ReduceOut>(spec: TaskSpec<Input, MapOut, ReduceOut>): Promise<ReduceOut> {const splits = spec.splitter(spec.input);const subTasks: SubTask<Input, MapOut>[] = splits.map((piece, i) => ({id: uid(),taskId: spec.id,shardIndex: i,input: piece,priority: spec.priority,resource: spec.resource || {},retries: 0,maxRetries: 2,createdAt: now(),}));const state: InternalTaskState<MapOut> = {spec,subTasks,pending: new Set(subTasks.map(s => s.id)),running: new Map(),results: new Map(),failed: new Map(),cancelled: false,};this.queue.push(state);bus.emit('task.submitted', { taskId: spec.id, total: subTasks.length });return new Promise<ReduceOut>((resolve, reject) => {const onProgress = () => {// 所有子任务完成if (state.pending.size === 0 && state.running.size === 0) {bus.off(`task.${spec.id}.progress`, onProgress);if (state.failed.size > 0) {reject(new Error(`Task ${spec.id} failed: ${JSON.stringify([...state.failed.values()])}`));} else {const merged = spec.reducer([...state.results.values()]);resolve(merged);}}};bus.on(`task.${spec.id}.progress`, onProgress);this.ensureLoop();});}pause(taskId: string) {// 简化:把 pending 清空到一个 shadow 队列,这里直接置 cancelledconst t = this.queue.find(q => q.spec.id === taskId);if (t) t.cancelled = true;}resume(taskId: string) {const t = this.queue.find(q => q.spec.id === taskId);if (t) t.cancelled = false;this.ensureLoop();}cancel(taskId: string) {const idx = this.queue.findIndex(q => q.spec.id === taskId);if (idx >= 0) this.queue.splice(idx, 1);}private ensureLoop() {if (this.timer) return;// 简单调度循环:每 50ms 拉一次队列this.timer = setInterval(() => this.tick(), 50) as unknown as number;}private pickNode(priority: Priority): NodeLike | undefined {const nodes = this.pool.healthy();if (nodes.length === 0) return undefined;// 加权轮询(按空闲容量)const sorted = nodes.map(n => ({ n, free: Math.max(n.metrics().capacity - n.metrics().running, 0) })).sort((a, b) => b.free - a.free || a.n.id.localeCompare(b.n.id));const pick = sorted[this.nodeSelectorIdx % sorted.length]?.n;this.nodeSelectorIdx++;return pick;}private async dispatchSubTask(state: InternalTaskState<any>, sub: SubTask<any, any>) {const node = this.pickNode(sub.priority);if (!node) return; // 没有可用节点,等下一轮state.pending.delete(sub.id);state.running.set(sub.id, { nodeId: node.id, startedAt: now() });bus.emit('subtask.started', { taskId: state.spec.id, subTaskId: sub.id, nodeId: node.id });// 传 mapper 名称给节点(在 Demo 下我们让 mapper 名称=spec.mapperFuncName)const result: SubTaskResult<any> = await node.post({...sub,taskId: state.spec.mapperFuncName, // 让 worker 能拿到 mapper 名称(简化处理)} as any);state.running.delete(sub.id);if (result.status === 'SUCCEEDED') {state.results.set(sub.id, result.output);bus.emit('subtask.succeeded', { subTaskId: sub.id, nodeId: node.id, durationMs: result.durationMs });} else {state.failed.set(sub.id, result.error || 'unknown');// 故障恢复:重试与迁移if (sub.retries < sub.maxRetries) {sub.retries++;state.pending.add(sub.id);state.failed.delete(sub.id);bus.emit('subtask.retried', { subTaskId: sub.id, retries: sub.retries });} else {bus.emit('subtask.failed', { subTaskId: sub.id, error: result.error });}}bus.emit(`task.${state.spec.id}.progress`);}private tick() {for (const t of this.queue) {if (t.cancelled) continue;// 避免一轮派发太多:每个任务每轮最多派发 min(剩余, 全局空闲)const freeSlots = this.pool.healthy().map(n => Math.max(n.metrics().capacity - n.metrics().running, 0)).reduce((a, b) => a + b, 0);if (freeSlots <= 0) continue;// 高优先级先派发(这里队列已按提交顺序,若要更严格可以改为多级队列)const readySubs = [...t.pending].slice(0, freeSlots);readySubs.forEach(subId => {const sub = t.subTasks.find(s => s.id === subId)!;this.dispatchSubTask(t, sub);});}// 全空闲则停表const hasPending = this.queue.some(t => t.pending.size > 0 || t.running.size > 0);if (!hasPending) {clearInterval(this.timer as unknown as number);this.timer = undefined;}}
}

简单 UI 页面(/pages/Index.ets

// /pages/Index.ets
import { NodePool, WorkerNode } from '../common/nodePool';
import { Scheduler } from '../common/scheduler';
import { TaskSpec } from '../common/types';
import { bus } from '../common/bus';@Entry
@Component
struct Index {private logs: string[] = [];private pool = new NodePool();private scheduler?: Scheduler;aboutToAppear() {// 注册3个“节点”,每个节点并发能力不同this.pool.register(new WorkerNode(2, 'workers/node.js'));this.pool.register(new WorkerNode(1, 'workers/node.js'));this.pool.register(new WorkerNode(3, 'workers/node.js'));this.scheduler = new Scheduler(this.pool);// 订阅事件做状态监控bus.on('task.submitted', (e) => this.pushLog(`Task submitted: ${e.taskId}, total: ${e.total}`));bus.on('subtask.started', (e) => this.pushLog(`SubTask ${e.subTaskId} -> ${e.nodeId} started`));bus.on('subtask.succeeded', (e) => this.pushLog(`SubTask ${e.subTaskId} done in ${e.durationMs}ms`));bus.on('subtask.retried', (e) => this.pushLog(`SubTask ${e.subTaskId} retry #${e.retries}`));bus.on('subtask.failed', (e) => this.pushLog(`SubTask ${e.subTaskId} failed: ${e.error}`));}private pushLog(s: string) {this.logs = [`[${new Date().toLocaleTimeString()}] ${s}`, ...this.logs].slice(0, 200);}build() {Column({ space: 12 }) {Text('分布式任务管理 Demo(Worker 模拟节点)').fontSize(20).fontWeight(FontWeight.Bold).margin({ top: 12 })Row({ space: 8 }) {Button('启动:素数计数(拆分并行)').onClick(() => this.runPrimeDemo());Button('启动:词频统计(文本分片)').onClick(() => this.runWordCountDemo());Button('清空日志').onClick(() => this.logs = []);}List() {ForEach(this.logs, (line: string) => {ListItem() {Text(line).fontSize(12).maxLines(2)}})}.height('70%')}.padding(16)}private async runPrimeDemo() {if (!this.scheduler) return;// 把一个大 n 拆成多个分片(比如 [1..2e6] 切成10片)const N = 2_000_00; // 2e5,演示更快;需要更重可放大const shards = 10;const step = Math.floor(N / shards);const spec: TaskSpec<number[], number, number> = {id: `prime-${Date.now()}`,name: 'heavyPrimeCount',priority: 'HIGH',input: Array.from({ length: shards }, (_, i) => (i + 1) * step),splitter: (arr) => arr, // 这里 input 已是分片上界数组mapperFuncName: 'heavyPrimeCount',reducer: (partials) => partials.reduce((a, b) => a + b, 0),resource: { cpuCost: 3 }};this.pushLog('启动素数计数任务...');try {const total = await this.scheduler.submit(spec);this.pushLog(`素数总数:${total}`);} catch (e: any) {this.pushLog(`任务失败:${e?.message || e}`);}}private async runWordCountDemo() {if (!this.scheduler) return;const text = `Hello world hello HarmonyOS world distributed scheduler demo hello`;const words = text.split(/\s+/);const shardSize = 3;const splits: string[] = [];for (let i = 0; i < words.length; i += shardSize) {splits.push(words.slice(i, i + shardSize).join(' '));}const spec = {id: `wc-${Date.now()}`,name: 'wordCount',priority: 'MEDIUM',input: splits,splitter: (arr: string[]) => arr,mapperFuncName: 'wordCount',reducer: (partials: Record<string, number>[]) => {const acc: Record<string, number> = {};partials.forEach(p => {Object.keys(p).forEach(k => { acc[k] = (acc[k] || 0) + p[k]; });});return acc;},resource: { cpuCost: 1 }};this.pushLog('启动词频统计任务...');try {const wc = await this.scheduler.submit(spec as any);this.pushLog(`词频结果:${JSON.stringify(wc)}`);} catch (e: any) {this.pushLog(`任务失败:${e?.message || e}`);}}
}

Worker 注册(module.json5 片段)

{"module": {"abilities": [ /* ... */ ],"resources": [ /* ... */ ],"js": [{"pages": [ "pages/Index" ],"name": "default","window": { "designWidth": 720 }}],"workers": [{"name": "node","src": "workers/node.ts" // 构建后路径可能为 workers/node.js}]}
}

小贴士:构建后实际加载的路径可能是 workers/node.js,上面页面里也用的是这个路径;如果工程配置不同,请按实际产物路径调整。

任务分解与合并

分解策略

  • 均匀分片:最常用,适合输入切分比较均匀的 CPU 密集任务
  • 采样分片:先抽样估算每片耗时,做不均匀切分
  • 流式分片:源源不断产出子任务(例如日志流/视频帧),调度器动态派发

合并策略

  • 交换律/结合律友好的任务直接 reduce(sum/max/min/count)
  • Map 合并(词频):Key-by 合并计数
  • 流式归并:子结果一到就合并,降低尾延迟;并可用于“中间态可视化”

负载均衡设计

核心思路

  • 实时负载度量:running / capacity、历史平均延迟、失败率
  • 节点选择:加权轮询/最少连接/基于延迟的选择
  • 背压:全局空闲槽为 0 时暂停派发

代码落地

文章中的 pickNode() 就是“最少连接 + 简单轮询”的混合,用每个节点的空闲槽排序,尽可能把任务丢到最空闲的节点。

任务状态监控

生命周期事件

  • task.submitted / subtask.started / subtask.succeeded / subtask.failed / subtask.retried
  • 通过 bus 统一发事件,页面订阅后就能做 UI 呈现、日志、埋点

健康检查

  • Demo 中以“最近 10 秒有成功返回”为健康;实际可用心跳 + 指标上报
  • 节点维度可上报:CPU/内存/温度/电量/网络质量 等

故障恢复机制

失败类型

  • 执行失败:代码异常、输入异常
  • 超时失败:节点卡死、资源不足
  • 节点失联:心跳超时、网络断开

迁移与重试

  • 子任务失败 -> 放回 pending,增加重试计数,重新选择另一个节点
  • 重试上限 -> 标记失败,最终任务失败
  • 可选策略:指数退避、冷热节点隔离、黑名单/熔断

Demo 中:maxRetries=2,失败后自动回到待分配队列,下一轮会被派往新的节点。

任务结果收集与整合

边算边合并

  • 调度器收到子结果即可调用 reducer 的“增量版本”,UI 侧可以不断更新“当前汇总”
  • 好处:用户能尽早看到部分结果,尾部子任务不再阻塞整体体验

一致性与幂等

  • 子任务应具备幂等(可重试),合并器也尽量写成幂等(例如用分片 id 去重)
  • 如果涉及跨设备一致性,可用分布式数据对象做“最终一致”存储

应用场景与示例代码

场景一:大图批处理(滤镜/缩放/特征提取)

  • 输入是一张大图或多张图片,拆成网格块(tile),各节点并行处理
  • 合并时按 tile 坐标把图块拼回去,或把特征向量聚合

示例(伪代码,映射到本文结构即可):

// mapper: tileProcess(imageTile) -> processedTile
// reducer: stitch(tiles) -> finalImage
const spec = {id: `img-${Date.now()}`,name: 'imageTiles',priority: 'HIGH',input: splitToTiles(image, 4, 4),     // 16个tilesplitter: (tiles) => tiles,mapperFuncName: 'processTile',        // 在节点注册reducer: (tiles) => stitch(tiles)
};
scheduler.submit(spec);

节点侧注册:

// workers/node.ts
globalMapperRegistry['processTile'] = (tile: Uint8Array) => {// 简化:亮度提升const out = new Uint8Array(tile.length);for (let i = 0; i < tile.length; i++) out[i] = Math.min(255, tile[i] + 10);return out;
};

场景二:日志关键词与异常检测(近实时)

  • 输入是按分钟切分的日志片段
  • mapper 做关键词统计 + 简单规则匹配;reducer 合并计数并输出异常列表
globalMapperRegistry['logScan'] = (chunk: string) => {const anomalies: string[] = [];const counts: Record<string, number> = {};chunk.split('\n').forEach(line => {if (line.includes('ERROR')) anomalies.push(line);const m = line.match(/\b(login|pay|video|fail)\b/gi);if (m) m.forEach(k => counts[k] = (counts[k] || 0) + 1);});return { anomalies, counts };
};const spec = {id: `log-${Date.now()}`,name: 'logScan',priority: 'MEDIUM',input: minuteChunks,                 // e.g. 最近10分钟splitter: (arr: string[]) => arr,mapperFuncName: 'logScan',reducer: (partials: any[]) => {const merged: any = { anomalies: [], counts: {} };partials.forEach(p => {merged.anomalies.push(...p.anomalies);Object.keys(p.counts).forEach(k => merged.counts[k] = (merged.counts[k] || 0) + p.counts[k]);});return merged;}
};
scheduler.submit(spec as any);

场景三:矩阵乘法(教育/科研小样)

  • 把矩阵 A×B 的计算拆成“按行×列”的点积子任务
  • mapper 只做一行与一列的点积;reducer 把每个单元填回结果矩阵
// 注册
globalMapperRegistry['matrixRowMul'] 已在上文// 拆分
function splitMatMul(A: number[][], B: number[][]) {const BT = transpose(B);const jobs: {i: number, j: number, row: number[], col: number[]}[] = [];for (let i = 0; i < A.length; i++) {for (let j = 0; j < BT.length; j++) {jobs.push({ i, j, row: A[i], col: BT[j] });}}return jobs;
}const spec = {id: `mm-${Date.now()}`,name: 'matmul',priority: 'HIGH',input: splitMatMul(A, B),splitter: (arr: any[]) => arr,mapperFuncName: 'matrixRowMul',reducer: (partials: {i:number,j:number,val:number}[]) => {const C = Array.from({length: A.length}, () => Array(B[0].length).fill(0));partials.forEach(p => { C[p.i][p.j] = p.val; });return C;}
};

注:上段 reducer 需要 mapper 返回 {i,j,val},可在 node.ts 里把返回值改为 ({ row, col, i, j }) => ({ i, j, val: dot(row,col) })

把 Demo 扩展为真分布式

设备发现与连接

  • @ohos.distributedHardware.deviceManager 获取在线设备列表
  • @ohos.communication.softbus 或对应封装建立通道/会话

远端执行与 RPC

  • 为远端设备提供一个“执行能力”(FA/Stage Ability + RemoteObject 接口),暴露 executeSubTask(mapperName: string, input: any)
  • 本地 RemoteNode implements NodeLike.post():序列化子任务,通过 SoftBus/RPC 发过去
  • 远端执行完成后回传结果,落到统一的 SubTaskResult

状态与心跳

  • 每个设备周期性上报:并发容量、运行中数量、平均延迟、电量/温度/网络延迟等
  • 调度侧据此更新 NodeMetrics,参与负载均衡和健康判定

故障与数据一致性

  • 会话断开/心跳超时 -> 标记节点故障 -> 回收其未完成子任务 -> 迁移
  • 中间态存储可用分布式数据对象;输出结果可写分布式文件或云端

QA 环节

Q1:任务优先级如何真正“生效”?
A:两层做法:队列层面先派高优先级;节点选择时也给高优先级更多空闲槽(例如按权重倍数分配)。复杂一点可以按优先级分别维护独立队列,或者实现“时间片”+“抢占”(把低优先级的待派发任务延后)。

Q2:如何避免某些超大子任务卡死单个节点?
A:拆分更细;或者给子任务设置超时,超时后分裂为更小子任务再派发;节点侧可实现可中断执行(定期检查取消标志)。

Q3:结果合并时如何保证幂等?
A:给每个子任务固定 subTaskIdshardIndex,合并器按 shardIndex 去重写入;重复到达也不影响最终态。

Q4:Worker 池在移动端会不会抢资源影响前台体验?
A:可以读取系统负载/温度/电量阈值,动态降低 capacity;前后台切换时暂停/降速;同时为高优先级的交互任务保留资源。

Q5:跨设备网络波动大,怎么稳?
A:引入“自适应批量”和“自适应超时”:网络差时缩小分片、降低并发;网络好时加大窗口。对失败设备做临时熔断,间隔重试恢复。

总结

本文给出了一套“能跑”的分布式任务管理 Demo:

  • 用 Worker 池模拟多节点,完成任务拆分、调度、负载均衡、状态监控、故障恢复、结果合并
  • 通过 NodeLike 接口把节点抽象出来,后续可无感替换为跨设备节点(SoftBus/RPC)
  • 给出三类常见场景的实现思路与代码骨架

如果你要落地到生产:

  1. 完善健康度量(CPU/内存/电量/温度/网络)与调度策略(延迟/容量/优先级的联合优化)
  2. 加上任务持久化(崩溃恢复)、可观测性(Tracing/指标/日志)
  3. 结合分布式数据对象或对象存储,保证结果的一致和可追溯
  4. 对不同设备画像(算力、温度墙、电量策略)做“亲和性调度”,整体效率会更高

把本文 Demo 跑通后,你基本就具备了“把任务丢给最近、最闲、最合适的设备执行,还能抗故障并回收结果”的核心工程骨架。把通信层替换为跨设备实现,就是真·分布式计算的任务管理器了。

http://www.xdnf.cn/news/18804.html

相关文章:

  • 07-分布式能力与多设备协同
  • JDBC入门
  • DAY 55 序列预测任务介绍
  • 小红书自动评论插件
  • JUC之并发容器
  • 深度学习与自动驾驶中的一些技术
  • Java基础(十四)分布式
  • KingBase数据库迁移利器:KDTS工具深度解析与实战指南
  • golang6 条件循环
  • 01-鸿蒙系统概览与发展历程
  • Android面试指南(五)
  • 青少年机器人技术(二级)等级考试试卷-实操题(2024年9月)
  • C语言文件操作精讲:从格式化读写到随机访问
  • GOLANG 接口
  • Axure:如何打开自定义操作界面
  • loj数列分块入门2-3
  • c++string
  • crypto.randomUUID is not a function
  • 拓扑排序|hash
  • frp+go-mmproxy 实现透明代理的内网穿透
  • Qt5 高级功能
  • 关于说明锂电池充电芯片实际应用
  • 曲面方程的三维可视化:从数学解析到Python实现
  • 从罗永浩访谈李想中学习现代家庭教育智慧
  • 定时器互补PWM输出和死区
  • 54.Redis持久化-AOF
  • JEI(Journal of Electronic lmaging)SCI四区期刊
  • 控制建模matlab练习16:线性状态反馈控制器-⑤轨迹追踪
  • Linux内核进程管理子系统有什么第三十三回 —— 进程主结构详解(29)
  • 【KO】前端面试四