流式数据服务端怎么传给前端,前端怎么接收?
01 引言
大模型时代,尤其会话模型为了提高用户的使用体验,它不会将所有的数据加载完成一次响应给客户端,而是通过数据流,一点点的将数据慢慢呈现出来。
正是这种有趣的交互方式一次次将SSE(Server Sent Event)
技术推到大众视野。之前的文章已经介绍过SSE推送技术
以及SSE替代WebSocket实现直播间实时评论功能
的文章,这里就不在赘述。
恰巧,有位粉丝朋友咨询怎么讲流式数据传给前端,前端怎么接收?小编为此做了相关的测试,整理在这里,分享给大家。
02 思维定式
我们经常编写的是一次请求,一次响应这样标准的http
请求。如:
@RestController
public class FooController {@GetMapping("/foo")public String foo() {return "success";}
}
而流式数据该怎么处理呢?一次请求,有源源不断的数据加载进来。如果按照思维定式处理,只能能到所有的数据全部加载完成,再响应给前端。这样带来的结果有:
- 响应时间过长,体验感很差
- 响应超时,前端无法获取到数据
流式数据返回在响应式编程里面非常普遍,如reactor.core.publisher.Flux
、io.reactivex.Flowable
等。我们姑且以Flux
和Flowable
作为案例测试。
03 流式响应
3.1 Flux<T>
Flux
是spring-boot-starter-webflux
管理下的reactor-core.jar
包。具体的Maven
:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
案例
@GetMapping("/stream")
public Flux<String> streamData() {return Flux.interval(Duration.ofSeconds(1)).map(sequence -> {// 为了方便:拼接随机数方便观察String a = "flux_data: " + sequence + ":" +new Random().nextFloat();System.out.println(a);return a;});
}
浏览器直接访问
我们可以看到正常的Http请求,只有等到所有的流数据处理完成之后才会一起展示出来。
3.2 Flowable<T>
Flowable
是位于rxjava.jar
包下。具体的Maven
:
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId><version>2.2.21</version>
</dependency>
案例
@GetMapping("/flowable")
public Flowable<String> flowable() {// 每隔一秒发送一条数据return Flowable.interval(1, TimeUnit.SECONDS).map(item -> {// 为了方便观察,增加随机数String data ="flowable_test:"+ item + "_" + new Random().nextFloat();System.out.println(data);return data;});
}
浏览器直接访问
断开服务器
3.3 前端接收
上面的案例均使用了浏览器直接访问,类似使用了ajax
、fetch
等请求。我们以fetch
为例:
$("button[name='send']").click(function (){fetch("/stream").then(resp => resp.text()).then(data => console.info(data));
});
结果
第一张图中并没有打印数据,而是等全部流数据完成之后才会打印展示。前端处理的话,同样等到数据处理完成才能渲染。
这种常规的请求已然不能满足我们的业务场景了。只能请出我们的大杀器SSE
了。
04 前端处理流式请求
关于SSE
这里不多介绍,详细可以看详细文档说明:https://javascript.info/server-sent-events
4.1 前端直接处理流式数据
服务端只要返回流式数据即可,如Flux<T>
、Flowable<T>
等。为了方便演示,我们通过按钮出发接收流式数据。服务端和上面的案例相同。
JS代码
$("button[name='send']").click(function (){let eventSource = new EventSource("/stream");eventSource.onmessage = function (event) {console.log("数据打印:" + event.data);}
});
结果
我们可以看到数据是一条一条返回给前端的。
4.2 服务端推数据给前端
不使用响应式编程的情况下,如何将数据逐步推给前端呢?那就通过org.springframework.web.servlet.mvc.method.annotation.SseEmitter
,手动推送给前端。
前端的JS
代码只需要订阅指定的路径,然后监听消息即可。写法同4.1
。
JS代码
$("button[name='send']").click(function (){let eventSource = new EventSource("/subscription");eventSource.onmessage = function (event) {console.log("数据打印:" + event.data);}
});
服务端
SseEmitter sseEmitter;@GetMapping("/subscription")
public SseEmitter sseEmitter() {sseEmitter = new SseEmitter();return sseEmitter;
}@PostConstruct
public void timer() {new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (sseEmitter != null) {try {sseEmitter.send("sseEmitter_" + new Random().nextFloat());} catch (IOException e) {throw new RuntimeException(e);}}}}, 1000, 1000);
}
通过/subscription
订阅,并生成对应的客户端。服务端推送消息,都是通过这个客户端推送的。timer()
定时器模拟数据自动推送。
我们直接看前端的效果:
当然我们对接业务中需要考虑连接超时的问题,以及页面多开客户端区分的问题。这部分内容可以参考之前的文档:SSE 推送技术
05 小结
流式数据对于大数据量的处理展示比较常见,包括Mybaits
也支持流式查询,防止频繁的数据库连接池的打开和关闭。流式处理一般都是长连接,长连接势必带来资源的占用。用不好,可能造成连接池沾满、内存溢出等意想不到的问题。