当前位置: 首页 > java >正文

NIFI的处理器:ExecuteGroovyScript 2.4.0

ExecuteGroovyScript是常用的处理器之一,用于执行GroovyScript脚本。该脚本负责处理传入的流文件(例如传输到SUCCESS或删除)以及由该脚本创建的任何流文件。如果处理不完整或不正确,会话将被回滚。

属性值-失败处理策略 Failure strategy:如何处理未处理的异常。如果你想通过代码管理异常,那么保留默认值“rollback”。如果选择了“转移到失败”并且发生了未处理的异常,则此会话中从传入队列接收到的所有flowFiles都将转移到“失败”关系,并设置了其他属性:ERROR_MESSAGE和ERROR_STACKTRACE。如果选择了“回滚”并且发生了未处理的异常,则从传入队列接收到的所有flowFiles都将受到惩罚并返回。如果处理器没有传入连接,则此参数无效。

该脚本的性能大约是java语言的10%,性能不高,调试效率也很低,但是编写灵活,使用方便。

如下为部分代码示例:

import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

// 解析输入 JSON
def flowFile = session.get();
if(!flowFile) return;
def jsonText = flowFile.read().getText('UTF-8');
def inputJson = new JsonSlurper().parseText(jsonText);

// 获取 timestamp
def timestamp = inputJson.timestamp

// 创建输出数组
def outputList = []

// 遍历 plc1 和 plc2,并提取每个设备的数据
inputJson.each { key, value ->
    if (!key.equalsIgnoreCase("ct_timestamp")) {
        // 克隆原始的 plc 数据
        def deviceData = value.collectEntries { k, v -> [(k): v] }

        // 创建输出数据结构
        def outputData = [
            data: deviceData,
            device_name: key,
            data_time: ct_timestamp
        ]

        // 将转换后的数据添加到输出列表
        outputList.add(outputData)
    }
}

// 返回输出 JSON
def outputJson = new JsonBuilder(outputList).toPrettyString()
flowFile=session.write(flowFile, {outputStream ->outputStream.write(
        outputJson.bytes
) } as OutputStreamCallback);
REL_SUCCESS << flowFile;

http://www.xdnf.cn/news/7785.html

相关文章:

  • 第14天-Matplotlib实现数据可视化
  • ollama使用gpu运行大模型
  • Xilinx 7Series\UltraScale 在线升级FLASH STARTUPE2和STARTUPE3使用
  • Java 定时任务中Cron 表达式与固定频率调度的区别及使用场景
  • 唯创安全优化纸业车间安全环境:门口盲区预警报警器的应用与成效
  • STL中的Vector(顺序表)
  • RabbitMQ——消息确认
  • NLP学习路线图(三): 微积分(梯度、导数等)
  • 有没有其他影视app可以像群晖video station一样可以被Windows的本地网络驱动器找到
  • 【Vue3】数据的返回和响应式处理(ref reactive)
  • 自建srs实时视频服务器支持RTMP推流和拉流
  • Kotlin 极简小抄 P8(不可空类型、可空类型、注意事项、非空断言 !!)
  • 什么是endpoints?
  • php://filter的trick
  • Server-Driven UI:Kotlin 如何重塑动态化 Android 应用开发
  • 《算法笔记》12.1小节——字符串专题->字符串hash进阶 问题 A: 求最长公共子串(串)
  • 代码随想录打卡|Day45 图论(孤岛的总面积 、沉没孤岛、水流问题、建造最大岛屿)
  • ARM反汇编浅析
  • 【JAVA】比较器Comparator与自然排序(28)
  • Flannel后端为UDP模式下,分析数据包的发送方式(一)
  • 【react18】在styled-components中引入图片报错
  • 项目中Warmup耗时高该如何操作处理
  • 深度解析 Java 中介者模式:重构复杂交互场景的优雅方案
  • 详解 C# 中基于发布-订阅模式的 Messenger 消息传递机制:Messenger.Default.Send/Register
  • 服务器网络配置 netplan一个网口配置两个ip(双ip、辅助ip、别名IP别名)
  • Java详解LeetCode 热题 100(18):LeetCode 73. 矩阵置零(Set Matrix Zeroes)详解
  • 广州卓远VR受邀参加2025智能体育典型案例调研活动,并入驻国体华为运动健康联合实验室!
  • 深入解析异步编程:Java NIO、Python `async/await` 与 C# `async/await` 的对比
  • junit单元测试
  • Ajax研究