当前位置: 首页 > ai >正文

深入解析 Java interrupt

Java 中断(Interrupt)机制详解

Java 的中断机制是一种协作式的线程间通信机制,用于请求另一个线程停止当前正在执行的操作。

Thread thread = Thread.currentThread();
thread.interrupt(); // 设置当前线程的中断状态

检查中断状态

// 检查中断状态
boolean isInterrupted = Thread.currentThread().isInterrupted();// 检查并清除中断状态
boolean wasInterrupted = Thread.interrupted();

中断产生的后果

对阻塞方法的影响

当线程在以下阻塞方法中被中断时,会抛出 InterruptedException

try {Thread.sleep(1000);        // sleepobject.wait();             // waitthread.join();             // joinLockSupport.park();        // park// 以及各种 I/O 操作和同步器
} catch (InterruptedException e) {// 中断异常处理Thread.currentThread().interrupt(); // 恢复中断状态
}

对非阻塞代码的影响

对于非阻塞代码,中断不会自动产生异常,需要手动检查:

while (!Thread.currentThread().isInterrupted()) {// 执行任务System.out.println("Working...");// 模拟工作try {Thread.sleep(100);} catch (InterruptedException e) {// 睡眠时被中断Thread.currentThread().interrupt(); // 重新设置中断状态break;}
}
System.out.println("线程被中断,优雅退出");

正确的中断处理模式

1. 传播中断状态

public void task() {try {while (true) {// 执行工作if (Thread.currentThread().isInterrupted()) {throw new InterruptedException();}// 或者检查中断的阻塞操作Thread.sleep(100);}} catch (InterruptedException e) {// 恢复中断状态并退出Thread.currentThread().interrupt();}
}

2. 不可中断任务的处理

public void nonInterruptibleTask() {while (true) {if (Thread.currentThread().isInterrupted()) {// 执行清理操作System.out.println("收到中断请求,执行清理后退出");break;}// 执行不可中断的工作}
}

正确使用中断机制可以实现线程的安全、协作式停止,避免使用已废弃的 stop()方法。

sleep为什么抛出异常

调用本地方法:

    private static native void sleepNanos0(long nanos) throws InterruptedException;

jvm.cpp中注册

JVM_ENTRY(void, JVM_SleepNanos(JNIEnv* env, jclass threadClass, jlong nanos))if (nanos < 0) {THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range");}if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");}// Save current thread state and restore it at the end of this block.// And set new thread state to SLEEPING.JavaThreadSleepState jtss(thread);HOTSPOT_THREAD_SLEEP_BEGIN(nanos / NANOSECS_PER_MILLISEC);if (nanos == 0) {os::naked_yield();} else {ThreadState old_state = thread->osthread()->get_state();thread->osthread()->set_state(SLEEPING);if (!thread->sleep_nanos(nanos)) { // interrupted// An asynchronous exception could have been thrown on// us while we were sleeping. We do not overwrite those.if (!HAS_PENDING_EXCEPTION) {HOTSPOT_THREAD_SLEEP_END(1);// TODO-FIXME: THROW_MSG returns which means we will not call set_state()// to properly restore the thread state.  That's likely wrong.THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");}}thread->osthread()->set_state(old_state);}HOTSPOT_THREAD_SLEEP_END(0);
JVM_END

JVM_SleepNanos这个函数是sleep抛出异常的地方。以下是该函数的核心逻辑:

  1. 首先检查nanos参数是否小于0,如果是则抛出IllegalArgumentException异常(实际上Java层面已经检查了,所以c++层面不会报异常):

  2. if (nanos < 0) {THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range");
    }
    
  3. 检查线程是否已被中断,如果是则抛出InterruptedException异常:

    if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
    }
    
  4. 在实际睡眠过程中,如果线程被中断,也会抛出InterruptedException异常:

    if (!thread->sleep_nanos(nanos)) { // interrupted// An asynchronous exception could have been thrown on// us while we were sleeping. We do not overwrite those.if (!HAS_PENDING_EXCEPTION) {// TODO-FIXME: THROW_MSG returns which means we will not call set_state()// to properly restore the thread state.  That's likely wrong.THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");}
    }
    

Object.wait异常

同样调用本地方法

    private final native void wait0(long timeoutMillis) throws InterruptedException;

cpp实现在ObjectMonitor.cpp,这个函数很长,简化如下:

// ObjectMonitor的等待方法:处理线程等待、中断和超时逻辑
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {JavaThread* current = THREAD;assert(InitDone, "未初始化");CHECK_OWNER();  // 检查所有者,非所有者抛出异常EventJavaMonitorWait wait_event;EventVirtualThreadPinned vthread_pinned_event;// 检查中断状态:若可中断且已中断,直接抛出异常if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);// 发送JVMTI监控等待事件if (JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}if (JvmtiExport::should_post_monitor_waited()) {JvmtiExport::post_monitor_waited(current, this, false);}if (wait_event.should_commit()) {post_monitor_wait_event(&wait_event, this, 0, millis, false);}THROW(vmSymbols::java_lang_InterruptedException());return;}// 虚拟线程处理:尝试挂起虚拟线程freeze_result result;ContinuationEntry* ce = current->last_continuation();bool is_virtual = ce != nullptr && ce->is_virtual_thread();if (is_virtual) {if (interruptible && JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}current->set_current_waiting_monitor(this);result = Continuation::try_preempt(current, ce->cont_oop(current));if (result == freeze_ok) {vthread_wait(current, millis);current->set_current_waiting_monitor(nullptr);return;}}// 进入等待状态JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);if (!is_virtual) {if (interruptible && JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}current->set_current_waiting_monitor(this);}// 创建等待节点并加入等待队列ObjectWaiter node(current);node.TState = ObjectWaiter::TS_WAIT;current->_ParkEvent->reset();OrderAccess::fence();Thread::SpinAcquire(&_wait_set_lock);add_waiter(&node);Thread::SpinRelease(&_wait_set_lock);// 退出监控器并准备挂起intx save = _recursions;_waiters++;_recursions = 0;exit(current);guarantee(!has_owner(current), "invariant");// 挂起线程:检查中断或超时int ret = OS_OK;int WasNotified = 0;bool interrupted = interruptible && current->is_interrupted(false);{OSThread* osthread = current->osthread();OSThreadWaitState osts(osthread, true);assert(current->thread_state() == _thread_in_vm, "invariant");{ClearSuccOnSuspend csos(this);ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true);if (interrupted || HAS_PENDING_EXCEPTION) {// 空处理:已有中断或异常} else if (!node._notified) {if (millis <= 0) {current->_ParkEvent->park();} else {ret = current->_ParkEvent->park(millis);}}}// 从等待队列移除节点(如果需要)if (node.TState == ObjectWaiter::TS_WAIT) {Thread::SpinAcquire(&_wait_set_lock);if (node.TState == ObjectWaiter::TS_WAIT) {dequeue_specific_waiter(&node);assert(!node._notified, "invariant");node.TState = ObjectWaiter::TS_RUN;}Thread::SpinRelease(&_wait_set_lock);}guarantee(node.TState != ObjectWaiter::TS_WAIT, "invariant");OrderAccess::loadload();if (has_successor(current)) clear_successor();WasNotified = node._notified;// 发送JVMTI监控等待完成事件if (JvmtiExport::should_post_monitor_waited()) {JvmtiExport::post_monitor_waited(current, this, ret == OS_TIMEOUT);if (node._notified && has_successor(current)) {current->_ParkEvent->unpark();}}if (wait_event.should_commit()) {post_monitor_wait_event(&wait_event, this, node._notifier_tid, millis, ret == OS_TIMEOUT);}OrderAccess::fence();// 重新获取监控器锁assert(!has_owner(current), "invariant");ObjectWaiter::TStates v = node.TState;if (v == ObjectWaiter::TS_RUN) {NoPreemptMark npm(current);enter(current);} else {guarantee(v == ObjectWaiter::TS_ENTER, "invariant");reenter_internal(current, &node);node.wait_reenter_end(this);}guarantee(node.TState == ObjectWaiter::TS_RUN, "invariant");assert(has_owner(current), "invariant");assert(!has_successor(current), "invariant");}// 清理状态:重置等待监控器引用current->set_current_waiting_monitor(nullptr);// 恢复递归计数guarantee(_recursions == 0, "invariant");int relock_count = JvmtiDeferredUpdates::get_and_reset_relock_count_after_wait(current);_recursions = save + relock_count;current->inc_held_monitor_count(relock_count);_waiters--;// 验证后置条件assert(has_owner(current), "invariant");assert(!has_successor(current), "invariant");assert_mark_word_consistency();// 虚拟线程事件记录if (ce != nullptr && ce->is_virtual_thread()) {current->post_vthread_pinned_event(&vthread_pinned_event, "Object.wait", result);}// 检查通知结果:未通知可能是超时或中断if (!WasNotified) {if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW(vmSymbols::java_lang_InterruptedException());}}
}

ObjectMonitor::wait 函数是 Java 对象监视器(Object Monitor)中实现 Object.wait() 方法的核心函数。它允许线程在对象上等待,直到其他线程调用该对象的 notify() 或 notifyAll() 方法。

1. 初始化和检查

  • 获取当前线程信息并验证初始化状态

  • 使用 CHECK_OWNER() 检查当前线程是否为监视器的所有者,如果不是则抛出 IllegalMonitorStateException

2. 事件初始化

  • 初始化 EventJavaMonitorWait 和 EventVirtualThreadPinned 事件,用于监控和追踪目的

3. 中断检查

  • 如果线程可中断(interruptible 为 true)且已被中断,则在进入等待前抛出 InterruptedException

4. 虚拟线程处理

  • 如果是虚拟线程(virtual thread),则调用 vthread_wait 方法进行特殊处理,包括:

    • 创建 ObjectWaiter 节点

    • 将节点添加到等待队列

    • 处理虚拟线程的挂起和状态转换

5. 常规等待逻辑

对于非虚拟线程或虚拟线程的后续步骤:

  • 创建 JavaThreadInObjectWaitState 对象来管理线程等待状态

  • 如果启用了 JVMTI,则发布 monitor wait 事件

  • 保存当前的递归计数并重置为 0

  • 退出监视器(调用 exit 方法)

  • 增加等待者计数

6. 等待阶段

  • 调用操作系统相关的 park 函数使线程进入等待状态

  • 等待可能因超时、中断或 notify 调用而结束

7. 唤醒后处理

  • 检查唤醒原因(通知、超时或中断)

  • 如果是中断且线程可中断,则抛出 InterruptedException

  • 重新获取监视器锁

  • 恢复递归计数

  • 发布 monitor waited 事件

8. 清理工作

  • 减少等待者计数
  • 清理 successor(如果存在)
  • 删除 ObjectWaiter 节点

这个函数实现了 Java 中对象等待机制的核心逻辑,处理了线程状态管理、同步控制、事件追踪和异常处理等多个方面,确保线程安全地等待和被唤醒。

Thread.join

A线程调用ThreadB.join,实际上就只是调用对方的wait。当线程B结束会唤醒所有的等待者。

/​**​等待该线程终止,最多等待 {@code millis} 毫秒。如果超时时间设为 {@code 0} 则表示无限期等待(直到线程终止)。•如果该线程尚未被 {@link #start() 启动},则此方法会立即返回,无需等待。••@implNote 实现说明:•对于平台线程,该实现采用循环调用 {@code this.wait} 方法,并在 {@code this.isAlive} 条件满足时持续等待。•当线程终止时,会调用 {@code this.notifyAll} 方法唤醒等待的线程。•建议应用程序不要在 {@code Thread} 实例上使用 {@code wait}、{@code notify} 或 {@code notifyAll} 方法,•以避免与线程同步机制发生冲突或造成意外行为。•@throws InterruptedException•如果任何线程中断了当前线程。当抛出此异常时,当前线程的<i>中断状态</i>将被清除。*/public final void join(long millis) throws InterruptedException {if (millis < 0)throw new IllegalArgumentException("timeout value is negative");if (this instanceof VirtualThread vthread) {if (isAlive()) {long nanos = MILLISECONDS.toNanos(millis);vthread.joinNanos(nanos);}return;}synchronized (this) {if (millis > 0) {if (isAlive()) {final long startTime = System.nanoTime();long delay = millis;do {wait(delay);} while (isAlive() && (delay = millis -NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);}} else {while (isAlive()) {wait(0);}}}}

http://www.xdnf.cn/news/19104.html

相关文章:

  • 从零开始部署 Kubernetes Dashboard:可视化管理你的集群
  • 不惧和谐,永不失效!!
  • 高并发内存池(19)-用基数树优化
  • JavaScript事件
  • FastAPI 入门科普:下一代高性能 Python Web 框架
  • 顶点 (VS)vs 片段(FS):OpenGL纹理滚动着色器的性能博弈与设计哲学
  • Shader开发(十八)实现纹理滚动效果
  • 【基础知识】互斥锁、读写锁、自旋锁的区别
  • 控制系统仿真之PID校正-PID校正(八)
  • 动手实现多元线性回归
  • 医疗 AI 的 “破圈” 时刻:辅助诊断、药物研发、慢病管理,哪些场景已落地见效?
  • 鸿蒙FA/PA架构:打破设备孤岛的技术密钥
  • Mysql基本语句(二)
  • 解决 jsdelivr CDN不可用问题
  • GTSAM中gtsam::LinearContainerFactor因子详解
  • Acrobat Pro DC 2025安装包下载及详细安装教程,PDF编辑器永久免费中文版(稳定版安装包)
  • Android 短信验证码输入框实现
  • 嵌入式Linux驱动开发:定时器驱动
  • 北斗传输采集数据的自定义通信协议
  • 香港电讯创新解决方案,开启企业数字化转型新篇章
  • CollageIt:简单易用的照片拼贴工具
  • Spring boot 启用第二数据源
  • 【Day 40】Shell脚本-条件判断
  • linux中.tar 解压命令
  • 【系列05】端侧AI:构建与部署高效的本地化AI模型 第4章:模型量化(Quantization)
  • 嵌入式Linux驱动开发 - DTS LED驱动
  • 管家婆辉煌ERP中如何查询畅销商品
  • java8浮点型算平均值
  • 37 HTB Remote 机器 - 容易
  • 字典解密助手ArchiveHelperWpfv1.0.12详细使用说明书