当前位置: 首页 > java >正文

物联网之使用Vertx实现HTTP/WebSocket最佳实践

小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现HTTP-Server和WebSocket-Server

实现Http/WebSocket【响应式】

Vertx-Web地址

实现过程

查看源码

代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
http/websocket【响应式】
<dependency><groupId>io.vertx</groupId><artifactId>vertx-web</artifactId><version>5.0.0</version>
</dependency>

HttpServerProperties

/*** @author laokou*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.http-server")
public class HttpServerProperties {private boolean auth = true;private String host = "0.0.0.0";private Set<Integer> ports = new HashSet<>(0);private boolean compressionSupported = false;private int compressionLevel = 6;private int maxWebSocketFrameSize = 65536;private int maxWebSocketMessageSize = 65536 * 4;private boolean handle100ContinueAutomatically = false;private int maxChunkSize = 8192;private int maxInitialLineLength = 4096;private int maxHeaderSize = 8192;private int maxFormAttributeSize = 8192;private int maxFormFields = 512;private int maxFormBufferedBytes = 2048;private Http2Settings initialSettings = new Http2Settings().setMaxConcurrentStreams(DEFAULT_INITIAL_SETTINGS_MAX_CONCURRENT_STREAMS);private List<HttpVersion> alpnVersions = new ArrayList<>(DEFAULT_ALPN_VERSIONS);private boolean http2ClearTextEnabled = true;private int http2ConnectionWindowSize = -1;private boolean decompressionSupported = false;private boolean acceptUnmaskedFrames = false;private int decoderInitialBufferSize = 256;private boolean perFrameWebSocketCompressionSupported = true;private boolean perMessageWebSocketCompressionSupported = true;private int webSocketCompressionLevel = 6;private boolean webSocketAllowServerNoContext = false;private boolean webSocketPreferredClientNoContext = false;private int webSocketClosingTimeout = 30;private TracingPolicy tracingPolicy = TracingPolicy.ALWAYS;private boolean registerWebSocketWriteHandlers = false;private int http2RstFloodMaxRstFramePerWindow = 400;private int http2RstFloodWindowDuration = 60;private TimeUnit http2RstFloodWindowDurationTimeUnit = TimeUnit.SECONDS;}

VertxHttpServer

/*** @author laokou*/
@Slf4j
final class VertxHttpServer extends AbstractVerticle {private final HttpServerProperties properties;private final Vertx vertx;private final Router router;private volatile Flux<HttpServer> httpServer;private boolean isClosed = false;VertxHttpServer(Vertx vertx, HttpServerProperties properties) {this.vertx = vertx;this.properties = properties;this.router = getRouter();}@Overridepublic synchronized void start() {httpServer = getHttpServerOptions().map(vertx::createHttpServer).doOnNext(server -> server.webSocketHandler(serverWebSocket -> {if (!RegexUtils.matches(WebsocketMessageEnum.UP_PROPERTY_REPORT.getPath(), serverWebSocket.path())) {serverWebSocket.close();return;}serverWebSocket.textMessageHandler(message -> log.info("【Vertx-Websocket-Server】 => 收到消息:{}", message)).closeHandler(v -> log.error("【Vertx-Websocket-Server】 => 断开连接")).exceptionHandler(err -> log.error("【Vertx-Websocket-Server】 => 错误信息:{}", err.getMessage(), err)).endHandler(v -> log.error("【Vertx-Websocket-Server】 => 结束"));}).requestHandler(router).listen().onComplete(completionHandler -> {if (isClosed) {return;}if (completionHandler.succeeded()) {log.info("【Vertx-Http-Server】 => HTTP服务启动成功,端口:{}", server.actualPort());}else {Throwable ex = completionHandler.cause();log.error("【Vertx-Http-Server】 => HTTP服务启动失败,错误信息:{}", ex.getMessage(), ex);}}));httpServer.subscribeOn(Schedulers.boundedElastic()).subscribe();}@Overridepublic synchronized void stop() {isClosed = true;httpServer.doOnNext(server -> server.close().onComplete(result -> {if (result.succeeded()) {log.info("【Vertx-Http-Server】 => HTTP服务停止成功,端口:{}", server.actualPort());}else {Throwable ex = result.cause();log.error("【Vertx-Http-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex);}})).subscribeOn(Schedulers.boundedElastic()).subscribe();}public void deploy() {// 部署服务vertx.deployVerticle(this);// 停止服务Runtime.getRuntime().addShutdownHook(new Thread(this::stop));}private Router getRouter() {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.post(HttpMessageEnum.UP_PROPERTY_REPORT.getRouter()).handler(ctx -> {String body = ctx.body().asString();Long deviceId = Long.valueOf(ctx.pathParam("deviceId"));Long productId = Long.valueOf(ctx.pathParam("productId"));log.info("productId:{},deviceId:{},body:{}", productId, deviceId, body);ctx.response().end();});return router;}private Flux<HttpServerOptions> getHttpServerOptions() {return Flux.fromIterable(properties.getPorts()).map(this::getHttpServerOption);}private HttpServerOptions getHttpServerOption(int port) {HttpServerOptions options = new HttpServerOptions();options.setHost(properties.getHost());options.setPort(port);options.setCompressionSupported(properties.isCompressionSupported());options.setDecompressionSupported(properties.isDecompressionSupported());options.setCompressionLevel(properties.getCompressionLevel());options.setMaxWebSocketFrameSize(properties.getMaxWebSocketFrameSize());options.setMaxWebSocketMessageSize(properties.getMaxWebSocketMessageSize());options.setHandle100ContinueAutomatically(properties.isHandle100ContinueAutomatically());options.setMaxChunkSize(properties.getMaxChunkSize());options.setMaxInitialLineLength(properties.getMaxInitialLineLength());options.setMaxHeaderSize(properties.getMaxHeaderSize());options.setMaxFormAttributeSize(properties.getMaxFormAttributeSize());options.setMaxFormFields(properties.getMaxFormFields());options.setMaxFormBufferedBytes(properties.getMaxFormBufferedBytes());options.setInitialSettings(properties.getInitialSettings());options.setAlpnVersions(properties.getAlpnVersions());options.setHttp2ClearTextEnabled(properties.isHttp2ClearTextEnabled());options.setHttp2ConnectionWindowSize(properties.getHttp2ConnectionWindowSize());options.setDecoderInitialBufferSize(properties.getDecoderInitialBufferSize());options.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());options.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());options.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());options.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());options.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());options.setWebSocketClosingTimeout(properties.getWebSocketClosingTimeout());options.setTracingPolicy(properties.getTracingPolicy());options.setRegisterWebSocketWriteHandlers(properties.isRegisterWebSocketWriteHandlers());options.setHttp2RstFloodMaxRstFramePerWindow(properties.getHttp2RstFloodMaxRstFramePerWindow());options.setHttp2RstFloodWindowDuration(properties.getHttp2RstFloodWindowDuration());options.setHttp2RstFloodWindowDurationTimeUnit(properties.getHttp2RstFloodWindowDurationTimeUnit());return options;}}

这只是一个demo,实际情况,需要对http请求进行鉴权,推荐使用OAuth2

我是老寇,我们下次再见啦!

http://www.xdnf.cn/news/7220.html

相关文章:

  • 精益数据分析(69/126):最小可行化产品(MVP)的设计、验证与数据驱动迭代
  • Android framework 问题记录
  • Ubuntu开机自启服务
  • 【数据仓库面试题合集③】实时数仓建模思路与实践详解
  • InternLM 论文分类微调实践(XTuner 版)
  • Java设计模式之外观模式:从入门到精通(保姆级教程)
  • 基于Elasticsearch的搜索引擎简介
  • 【Android】一键创建Keystore + Keystore 参数说明 + 查询SHA256(JDK Keytool Keystore)
  • 【神经网络与深度学习】GAN 生成对抗训练模型在实际训练中很容易判别器收敛,生成器发散
  • 【物联网】 ubantu20.04 搭建L2TP服务器
  • 电脑安装程序报错写入注册表失败
  • 基于51单片机教室红外计数灯光控制—可蓝牙控制
  • 为什么wifi有信号却连接不上?
  • Oracle 数据库的默认隔离级别
  • 探索C++面向对象:从抽象到实体的元规则(上篇)
  • docker介绍与常用命令汇总
  • C语法备注01
  • 项目记录:「五秒反应挑战」小游戏的开发全过程
  • 「NameCraft · 幻想命名器」开发记:我和 CodeBuddy 的一次奇幻共创之旅
  • MySQL之函数
  • 洛谷U536262 井底之“鸡”
  • 初识 Redis
  • 云计算简介:从“水电”到“数字引擎”的技术革命
  • LeetCode 219.存在重复元素 II
  • OpenCV CUDA 模块中的矩阵算术运算-----在频域(复数频谱)中执行逐元素乘法并缩放的函数mulAndScaleSpectrums()
  • OSPF路由撤销及优化
  • 集成电路生产测试中CP针卡(Probe Card)简介
  • 深度强化学习 | 基于SAC算法的移动机器人路径跟踪(附Pytorch实现)
  • Redis学习打卡-Day4-Redis实现消息队列
  • 企业开发工具git的使用:从入门到高效团队协作