基于 RxJava 构建强大的 Android 文件下载管理器
在 Android 开发中,文件下载是一个极其常见的需求,但同时也是一个充满挑战的功能。它涉及到网络请求、IO 操作、线程管理、进度更新、生命周期感知,以及如断点续传等高级特性。手动管理这些复杂性很容易导致代码混乱、内存泄漏和糟糕的用户体验。
RxJava,作为一个响应式编程库,其强大的数据流转换和线程控制能力,非常适合用来构建一个清晰、健壮且功能强大的下载管理器。本文将深入探讨如何利用 RxJava 实现一个支持多文件并行下载、实时进度更新和断点续传的下载管理器。
一、核心思路与架构设计
我们的设计目标是创建一个 DownloadManager
类,它对外提供简洁的 RxJava 流(Observable
/Flowable
)来执行下载任务并返回进度状态。内部将处理所有复杂的逻辑。
核心角色:
DownloadManager
: 单例类,负责管理所有下载任务,提供开始、暂停、查询等方法。DownloadTask
: 封装单个下载任务的信息(如 URL、保存路径、当前状态等)。DownloadStatus
: 一个数据类,代表下载状态(如已下载字节数、总字节数、下载状态STARTED
,IN_PROGRESS
,PAUSED
,COMPLETED
,ERROR
)。
技术栈:
RxJava 3: 用于构建响应式数据流。
Retrofit + OkHttp: 用于处理网络请求,OkHttp 天然支持断点续传。
Room (或其它持久化方案): 用于存储任务队列和断点信息,保证应用重启后能恢复任务。
二、实现多文件并行下载
并行下载的核心在于利用 RxJava 的 flatMap
操作符为每个下载任务创建独立的流,并使用 Schedulers 来控制并发度。
1. 定义下载接口(Retrofit)
使用 Retrofit 定义一个支持范围请求(Range Header,断点续传的关键)的接口。
kotlin
interface DownloadApi {@Streaming // 重要!防止 OkHttp 将大文件全部缓存在内存中@GETfun downloadFile(@Url fileUrl: String,@Header("Range") range: String = "bytes=0-" // 默认从0开始,即重新下载): Observable<ResponseBody> }
2. 核心下载方法(串行 -> 并行转换)
DownloadManager
中的方法接收一个任务列表,并返回一个合并后的状态流。
kotlin
fun startDownloads(tasks: List<DownloadTask>): Observable<DownloadStatus> {return Observable.fromIterable(tasks).flatMap( { task ->startDownload(task).subscribeOn(Schedulers.io())}, maxConcurrency) // maxConcurrency 是最大并行数参数.observeOn(AndroidSchedulers.mainThread()) // 在主线程观察,更新UI }private fun startDownload(task: DownloadTask): Observable<DownloadStatus> {return Observable.create { emitter ->// 标记任务开始emitter.onNext(DownloadStatus.progress(task, 0, 0))// 1. 检查本地文件已下载的长度,用于断点续传val downloadedLength = getDownloadedLength(task.localPath)val totalLength = getTotalLength(task) // 可能需要先发起一个HEAD请求获取// 2. 构建带Range头的请求val call = downloadApi.downloadFile(task.url,"bytes=$downloadedLength-$totalLength")// 3. 执行同步请求(因为在RxJava的create中,已在IO线程)val response = call.execute()if (response.isSuccessful) {response.body()?.let { body ->// 4. 以追加模式写入文件val file = File(task.localPath)val fileWriter = FileOutputStream(file, true).channelval inputStream = body.byteStream()val buffer = ByteArray(1024 * 8)var read: Intvar currentLength = downloadedLength// 5. 循环读取并写入while (inputStream.read(buffer).also { read = it } != -1) {if (emitter.isDisposed) { // 如果订阅被dispose(如用户暂停),则停止下载fileWriter.close()inputStream.close()emitter.onNext(DownloadStatus.paused(task, currentLength, totalLength))return@create}fileWriter.write(ByteBuffer.wrap(buffer, 0, read))currentLength += read// 6. 发射进度更新事件emitter.onNext(DownloadStatus.inProgress(task, currentLength, totalLength))}// 7. 下载完成fileWriter.close()inputStream.close()emitter.onNext(DownloadStatus.completed(task, totalLength, totalLength))emitter.onComplete()}} else {emitter.onError(IOException("Download failed: ${response.code()}"))}}.doOnSubscribe {// 任务开始,更新数据库状态为STARTEDupdateTaskInDB(task.copy(status = DownloadStatus.STARTED))}.doOnError { error ->// 任务出错,更新数据库状态为ERRORupdateTaskInDB(task.copy(status = DownloadStatus.ERROR))}.onErrorResumeNext { error: Throwable ->// 发生错误时,发射一个错误状态而不是终止流Observable.just(DownloadStatus.error(task, error))} }
关键点:
flatMap
的maxConcurrency
参数控制了最大并行任务数。使用
subscribeOn(Schedulers.io())
确保每个下载任务都在 IO 线程执行。在
Observable.create
中执行同步网络请求 (execute()
) 和文件 IO。
三、实现下载进度实时更新
上面的代码中已经包含了进度更新的核心逻辑。我们在循环读取网络流的同时,不断计算 currentLength
并发射出去。
优化: 频繁的进度更新(每读取 8KB 就更新一次)可能会导致过多的 UI 重绘。我们可以通过 RxJava 的操作符进行采样(throttling)。
在 ViewModel
或 Presenter
中处理状态流时,可以这样做:
kotlin
downloadManager.startDownloads(tasks).throttleLast(500, TimeUnit.MILLISECONDS) // 每500毫秒只取最后一个进度事件.observeOn(AndroidSchedulers.mainThread()).subscribe { status ->when (status) {is DownloadStatus.InProgress -> {// 更新UI:计算百分比 (status.currentLength * 100 / status.totalLength)updateProgressBar(status.task.id, status.currentLength, status.totalLength)}is DownloadStatus.Completed -> { ... }is DownloadStatus.Paused -> { ... }is DownloadStatus.Error -> { ... }}}
四、实现断点续传功能
断点续传是下载管理器的灵魂功能,其核心是 HTTP Range Requests。
1. 原理
在请求头中携带
Range: bytes=[start]-[end]
,告诉服务器从文件的哪个字节开始传输。如果服务器支持,它会返回状态码
206 (Partial Content)
以及请求的字节块。
2. 实现步骤
我们的实现已经包含了断点续传的逻辑:
记录已下载长度:
getDownloadedLength(task.localPath)
通过检查本地已存在文件的大小来确定。添加Range头: 在 Retrofit 请求中添加
"bytes=$downloadedLength-"
头。-
表示直到文件末尾。以追加模式写入文件:
FileOutputStream(file, true)
中的true
参数代表追加(append),而不是覆盖。
3. 持久化存储(增强鲁棒性)
为了保证应用被杀死后依然能恢复任务,我们需要将任务信息(URL、路径、已下载大小、总大小等)存入数据库(如 Room)。
开始下载时:从数据库加载任务,获取
downloadedLength
。进度更新时:不仅更新 UI,也定期(例如每下载 1% 或每秒)将当前进度更新到数据库。注意频率,避免过于频繁的数据库操作。
暂停/完成/错误时:更新数据库中的任务状态和最终进度。
这样,当用户重新打开应用时,DownloadManager
可以初始化所有未完成的任务,并提供“继续所有”或“重试”的功能。
五、完整的使用示例
1. 在 ViewModel 中
kotlin
class DownloadViewModel : ViewModel() {private val downloadManager = DownloadManager.getInstance()private val compositeDisposable = CompositeDisposable()val downloadStatus = MutableLiveData<DownloadStatus>()fun startDownloadingFiles(urlList: List<String>) {val tasks = urlList.map { url ->DownloadTask(id = UUID.randomUUID().toString(),url = url,localPath = "${context.getExternalFilesDir(null)}/${getFileName(url)}",status = DownloadStatus.QUEUED)}// 先将任务存入数据库saveTasksToDB(tasks)// 开始下载并监听val disposable = downloadManager.startDownloads(tasks).subscribe { status ->downloadStatus.postValue(status)}compositeDisposable.add(disposable)}fun pauseDownload(taskId: String) {// 调用downloadManager的方法,其内部会通过emitter.isDisposed来中断下载流downloadManager.pauseDownload(taskId)}override fun onCleared() {super.onCleared()compositeDisposable.dispose() // 避免内存泄漏} }
2. 在 Activity/Fragment 中观察状态并更新 UI
kotlin
viewModel.downloadStatus.observe(this) { status ->when (status) {is DownloadStatus.InProgress -> {val progress = (status.currentLength * 100 / status.totalLength).toInt()findViewById<ProgressBar>(R.id.progress_bar).progress = progressfindViewById<TextView>(R.id.progress_text).text = "$progress%"}is DownloadStatus.Completed -> {showToast("Download completed!")}is DownloadStatus.Paused -> {showToast("Download paused.")}} }
总结
通过 RxJava,我们将复杂的异步下载任务转换为了清晰易懂的数据流(Stream)。
flatMap
轻松实现了并发控制。Observable.create
给了我们完全控制底层操作(网络、文件IO)的能力,使我们能方便地实现暂停(dispose)和进度发射。丰富的操作符(如
throttleLast
,onErrorResumeNext
)帮助我们优化数据流和错误处理。响应式编程范式 使得状态更新和 UI 交互变得非常自然。
这种架构不仅功能强大,而且扩展性极佳,可以很容易地添加诸如任务优先级、速度限制、仅Wi-Fi下载等更多高级功能。希望本文能为你构建自己的下载管理器提供坚实的 foundation。